You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/11 08:24:22 UTC

[2/2] incubator-beam git commit: DirectPipelineRunner bug fixes.

DirectPipelineRunner bug fixes.

- Execute empty [] | pipelines to the end.
- use pickler to serialize/deserialize DoFns instead of deepcopy
  similar to the othe execution environments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58

Branch: refs/heads/python-sdk
Commit: 6f93cd5884797c0880766c7737e106765becf96d
Parents: 778194f
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 10 17:37:45 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Nov 11 00:23:45 2016 -0800

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          | 14 ++++++---
 sdks/python/apache_beam/pipeline_test.py        |  5 +++
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 .../apache_beam/runners/direct/executor.py      | 33 ++++++++++++--------
 .../runners/direct/transform_evaluator.py       |  8 +++--
 5 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index edc0a17..72fccb2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -29,6 +29,8 @@ from apache_beam import io
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.io import fileio
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
 from apache_beam.utils.options import TypeOptions
 from apache_beam.examples.snippets import snippets
 
@@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-    lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']
+    p = beam.Pipeline('DirectPipelineRunner')
+    lines = (p | beam.Create(
+        ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 
     # [START type_hints_deterministic_key]
     class Player(object):
@@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase):
             beam.typehints.Tuple[Player, int]))
     # [END type_hints_deterministic_key]
 
-    self.assertEquals(
-        {('banana', 3), ('kiwi', 4), ('zucchini', 3)},
-        set(totals | beam.Map(lambda (k, v): (k.name, v))))
+    assert_that(
+        totals | beam.Map(lambda (k, v): (k.name, v)),
+        equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
+
+    p.run()
 
 
 class SnippetsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index a4c983f..013796c 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Map
@@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase):
 
     pipeline.run()
 
+  def test_aggregator_empty_input(self):
+    actual = [] | CombineGlobally(max).without_defaults()
+    self.assertEqual(actual, [])
+
   def test_pipeline_as_context(self):
     def raise_exception(exn):
       raise exn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2e5fe74..1afd486 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -125,7 +125,7 @@ class BufferingInMemoryCache(object):
     for key, value in self._cache.iteritems():
       applied_ptransform, tag = key
       self._pvalue_cache.cache_output(applied_ptransform, tag, value)
-      self._cache = None
+    self._cache = None
 
 
 class DirectPipelineResult(PipelineResult):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 0f1c53b..378aecf 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -202,7 +202,7 @@ class TransformExecutorServices(object):
     if not cached:
       cached = SerialEvaluationState(self._executor_service, self._scheduled)
       self._serial_cache[step] = cached
-    return  cached
+    return cached
 
   @property
   def executors(self):
@@ -480,18 +480,20 @@ class _ExecutorServiceParallelExecutor(object):
         Otherwise monitor task should schedule itself again for future
         execution.
       """
-      if self._executor.evaluation_context.is_done():
-        self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
-        self._executor.executor_service.shutdown()
-        return True
-      elif not self._is_executing:
-        self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-                Exception('Monitor task detected a pipeline stall.')))
+      if self._is_executing():
+        # There are some bundles still in progress.
+        return False
+      else:
+        if self._executor.evaluation_context.is_done():
+          self._executor.visible_updates.offer(
+              _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
+        else:
+          # Nothing is scheduled for execution, but watermarks incomplete.
+          self._executor.visible_updates.offer(
+              _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                  Exception('Monitor task detected a pipeline stall.')))
         self._executor.executor_service.shutdown()
         return True
-      return False
 
     def _fire_timers(self):
       """Schedules triggered consumers if any timers fired.
@@ -515,8 +517,13 @@ class _ExecutorServiceParallelExecutor(object):
 
     def _is_executing(self):
       """Returns True if there is at least one non-blocked TransformExecutor."""
-      for transform_executor in (
-          self._executor.transform_executor_services.executors):
+      executors = self._executor.transform_executor_services.executors
+      if not executors:
+        # Nothing is executing.
+        return False
+
+      # Ensure that at least one of those executors is not blocked.
+      for transform_executor in executors:
         if not transform_executor.blocked:
           return True
       return False

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index c732d7f..093f183 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -20,10 +20,10 @@
 from __future__ import absolute_import
 
 import collections
-import copy
 
 from apache_beam import coders
 from apache_beam import pvalue
+from apache_beam.internal import pickler
 import apache_beam.io as io
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
@@ -338,7 +338,8 @@ class _ParDoEvaluator(_TransformEvaluator):
 
     self._counter_factory = counters.CounterFactory()
 
-    dofn = copy.deepcopy(transform.dofn)
+    # TODO(aaltay): Consider storing the serialized form as an optimization.
+    dofn = pickler.loads(pickler.dumps(transform.dofn))
 
     pipeline_options = self._evaluation_context.pipeline_options
     if (pipeline_options is not None
@@ -504,7 +505,8 @@ class _NativeWriteEvaluator(_TransformEvaluator):
         side_inputs)
 
     assert applied_ptransform.transform.sink
-    self._sink = copy.deepcopy(applied_ptransform.transform.sink)
+    # TODO(aaltay): Consider storing the serialized form as an optimization.
+    self._sink = pickler.loads(pickler.dumps(applied_ptransform.transform.sink))
 
   @property
   def _is_final_bundle(self):