You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/16 16:57:26 UTC

[1/2] beam git commit: Introduce pending bundles and RootBundleProvider in DirectRunner

Repository: beam
Updated Branches:
  refs/heads/master f2a32b2ca -> 54f307891


Introduce pending bundles and RootBundleProvider in DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5aee624c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5aee624c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5aee624c

Branch: refs/heads/master
Commit: 5aee624cbc2815efaf04c7e4854138370a45a1f6
Parents: f2a32b2
Author: Charles Chen <cc...@google.com>
Authored: Thu Jun 15 14:27:47 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jun 16 09:56:51 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/bundle_factory.py            |  2 +-
 .../apache_beam/runners/direct/executor.py      | 64 ++++++++++++--------
 .../runners/direct/transform_evaluator.py       | 39 +++++++++++-
 3 files changed, 77 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index ed00b03..0182b4c 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -108,7 +108,7 @@ class _Bundle(object):
                             self._initial_windowed_value.windows)
 
   def __init__(self, pcollection, stacked=True):
-    assert isinstance(pcollection, pvalue.PCollection)
+    assert isinstance(pcollection, (pvalue.PBegin, pvalue.PCollection))
     self._pcollection = pcollection
     self._elements = []
     self._stacked = stacked

http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 86db291..eff2d3c 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import collections
+import itertools
 import logging
 import Queue
 import sys
@@ -250,12 +251,12 @@ class TransformExecutor(_ExecutorService.CallableTask):
   """
 
   def __init__(self, transform_evaluator_registry, evaluation_context,
-               input_bundle, applied_transform, completion_callback,
+               input_bundle, applied_ptransform, completion_callback,
                transform_evaluation_state):
     self._transform_evaluator_registry = transform_evaluator_registry
     self._evaluation_context = evaluation_context
     self._input_bundle = input_bundle
-    self._applied_transform = applied_transform
+    self._applied_ptransform = applied_ptransform
     self._completion_callback = completion_callback
     self._transform_evaluation_state = transform_evaluation_state
     self._side_input_values = {}
@@ -264,11 +265,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
 
   def call(self):
     self._call_count += 1
-    assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
-    metrics_container = MetricsContainer(self._applied_transform.full_label)
+    assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs))
+    metrics_container = MetricsContainer(self._applied_ptransform.full_label)
     scoped_metrics_container = ScopedMetricsContainer(metrics_container)
 
-    for side_input in self._applied_transform.side_inputs:
+    for side_input in self._applied_ptransform.side_inputs:
       if side_input not in self._side_input_values:
         has_result, value = (
             self._evaluation_context.get_value_or_schedule_after_output(
@@ -280,11 +281,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
         self._side_input_values[side_input] = value
 
     side_input_values = [self._side_input_values[side_input]
-                         for side_input in self._applied_transform.side_inputs]
+                         for side_input in self._applied_ptransform.side_inputs]
 
     try:
-      evaluator = self._transform_evaluator_registry.for_application(
-          self._applied_transform, self._input_bundle,
+      evaluator = self._transform_evaluator_registry.get_evaluator(
+          self._applied_ptransform, self._input_bundle,
           side_input_values, scoped_metrics_container)
 
       if self._input_bundle:
@@ -298,13 +299,13 @@ class TransformExecutor(_ExecutorService.CallableTask):
       if self._evaluation_context.has_cache:
         for uncommitted_bundle in result.uncommitted_output_bundles:
           self._evaluation_context.append_to_cache(
-              self._applied_transform, uncommitted_bundle.tag,
+              self._applied_ptransform, uncommitted_bundle.tag,
               uncommitted_bundle.get_elements_iterable())
         undeclared_tag_values = result.undeclared_tag_values
         if undeclared_tag_values:
           for tag, value in undeclared_tag_values.iteritems():
             self._evaluation_context.append_to_cache(
-                self._applied_transform, tag, value)
+                self._applied_ptransform, tag, value)
 
       self._completion_callback.handle_result(self._input_bundle, result)
       return result
@@ -353,6 +354,15 @@ class _ExecutorServiceParallelExecutor(object):
 
   def start(self, roots):
     self.root_nodes = frozenset(roots)
+    self.all_nodes = frozenset(
+        itertools.chain(
+            roots,
+            *itertools.chain(self.value_to_consumers.values())))
+    self.node_to_pending_bundles = {}
+    for root_node in self.root_nodes:
+      provider = (self.transform_evaluator_registry
+                  .get_root_bundle_provider(root_node))
+      self.node_to_pending_bundles[root_node] = provider.get_root_bundles()
     self.executor_service.submit(
         _ExecutorServiceParallelExecutor._MonitorTask(self))
 
@@ -372,22 +382,22 @@ class _ExecutorServiceParallelExecutor(object):
         self.schedule_consumption(applied_ptransform, committed_bundle,
                                   self.default_completion_callback)
 
-  def schedule_consumption(self, consumer_applied_transform, committed_bundle,
+  def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
                            on_complete):
     """Schedules evaluation of the given bundle with the transform."""
-    assert all([consumer_applied_transform, on_complete])
-    assert committed_bundle or consumer_applied_transform in self.root_nodes
-    if (committed_bundle
-        and self.transform_evaluator_registry.should_execute_serially(
-            consumer_applied_transform)):
+    assert consumer_applied_ptransform
+    assert committed_bundle
+    assert on_complete
+    if self.transform_evaluator_registry.should_execute_serially(
+        consumer_applied_ptransform):
       transform_executor_service = self.transform_executor_services.serial(
-          consumer_applied_transform)
+          consumer_applied_ptransform)
     else:
       transform_executor_service = self.transform_executor_services.parallel()
 
     transform_executor = TransformExecutor(
         self.transform_evaluator_registry, self.evaluation_context,
-        committed_bundle, consumer_applied_transform, on_complete,
+        committed_bundle, consumer_applied_ptransform, on_complete,
         transform_executor_service)
     transform_executor_service.schedule(transform_executor)
 
@@ -564,10 +574,14 @@ class _ExecutorServiceParallelExecutor(object):
         # additional work.
         return
 
-      # All current TransformExecutors are blocked; add more work from the
-      # roots.
-      for applied_transform in self._executor.root_nodes:
-        if not self._executor.evaluation_context.is_done(applied_transform):
-          self._executor.schedule_consumption(
-              applied_transform, None,
-              self._executor.default_completion_callback)
+      # All current TransformExecutors are blocked; add more work from any
+      # pending bundles.
+      for applied_ptransform in self._executor.all_nodes:
+        if not self._executor.evaluation_context.is_done(applied_ptransform):
+          pending_bundles = self._executor.node_to_pending_bundles.get(
+              applied_ptransform, [])
+          for bundle in pending_bundles:
+            self._executor.schedule_consumption(
+                applied_ptransform, bundle,
+                self._executor.default_completion_callback)
+          self._executor.node_to_pending_bundles[applied_ptransform] = []

http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f5b5db5..6e73561 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -58,8 +58,11 @@ class TransformEvaluatorRegistry(object):
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
     }
+    self._root_bundle_providers = {
+        core.PTransform: DefaultRootBundleProvider,
+    }
 
-  def for_application(
+  def get_evaluator(
       self, applied_ptransform, input_committed_bundle,
       side_inputs, scoped_metrics_container):
     """Returns a TransformEvaluator suitable for processing given inputs."""
@@ -81,6 +84,18 @@ class TransformEvaluatorRegistry(object):
                      input_committed_bundle, side_inputs,
                      scoped_metrics_container)
 
+  def get_root_bundle_provider(self, applied_ptransform):
+    provider_cls = None
+    for cls in applied_ptransform.transform.__class__.mro():
+      provider_cls = self._root_bundle_providers.get(cls)
+      if provider_cls:
+        break
+    if not provider_cls:
+      raise NotImplementedError(
+          'Root provider for [%s] not implemented in runner %s' % (
+              type(applied_ptransform.transform), self))
+    return provider_cls(self._evaluation_context, applied_ptransform)
+
   def should_execute_serially(self, applied_ptransform):
     """Returns True if this applied_ptransform should run one bundle at a time.
 
@@ -104,6 +119,27 @@ class TransformEvaluatorRegistry(object):
                       (core._GroupByKeyOnly, _NativeWrite))
 
 
+class RootBundleProvider(object):
+  """Provides bundles for the initial execution of a root transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform):
+    self._evaluation_context = evaluation_context
+    self._applied_ptransform = applied_ptransform
+
+  def get_root_bundles(self):
+    raise NotImplementedError
+
+
+class DefaultRootBundleProvider(RootBundleProvider):
+  """Provides an empty bundle by default for root transforms."""
+
+  def get_root_bundles(self):
+    input_node = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
+    empty_bundle = (
+        self._evaluation_context.create_empty_committed_bundle(input_node))
+    return [empty_bundle]
+
+
 class _TransformEvaluator(object):
   """An evaluator of a specific application of a transform."""
 
@@ -180,7 +216,6 @@ class _BoundedReadEvaluator(_TransformEvaluator):
 
   def __init__(self, evaluation_context, applied_ptransform,
                input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not input_committed_bundle
     assert not side_inputs
     self._source = applied_ptransform.transform.source
     self._source.pipeline_options = evaluation_context.pipeline_options


[2/2] beam git commit: This closes #3370

Posted by al...@apache.org.
This closes #3370


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54f30789
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54f30789
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54f30789

Branch: refs/heads/master
Commit: 54f30789165a49d92479b54cf3dac9d4a063e366
Parents: f2a32b2 5aee624
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jun 16 09:57:16 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jun 16 09:57:16 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/bundle_factory.py            |  2 +-
 .../apache_beam/runners/direct/executor.py      | 64 ++++++++++++--------
 .../runners/direct/transform_evaluator.py       | 39 +++++++++++-
 3 files changed, 77 insertions(+), 28 deletions(-)
----------------------------------------------------------------------