You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/11 08:24:22 UTC
[2/2] incubator-beam git commit: DirectPipelineRunner bug fixes.
DirectPipelineRunner bug fixes.
- Execute empty [] | pipelines to the end.
- use pickler to serialize/deserialize DoFns instead of deepcopy
similar to the othe execution environments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58
Branch: refs/heads/python-sdk
Commit: 6f93cd5884797c0880766c7737e106765becf96d
Parents: 778194f
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 10 17:37:45 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Nov 11 00:23:45 2016 -0800
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 14 ++++++---
sdks/python/apache_beam/pipeline_test.py | 5 +++
.../apache_beam/runners/direct/direct_runner.py | 2 +-
.../apache_beam/runners/direct/executor.py | 33 ++++++++++++--------
.../runners/direct/transform_evaluator.py | 8 +++--
5 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index edc0a17..72fccb2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -29,6 +29,8 @@ from apache_beam import io
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.io import fileio
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
from apache_beam.utils.options import TypeOptions
from apache_beam.examples.snippets import snippets
@@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_runtime_on]
def test_deterministic_key(self):
- lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']
+ p = beam.Pipeline('DirectPipelineRunner')
+ lines = (p | beam.Create(
+ ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
# [START type_hints_deterministic_key]
class Player(object):
@@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase):
beam.typehints.Tuple[Player, int]))
# [END type_hints_deterministic_key]
- self.assertEquals(
- {('banana', 3), ('kiwi', 4), ('zucchini', 3)},
- set(totals | beam.Map(lambda (k, v): (k.name, v))))
+ assert_that(
+ totals | beam.Map(lambda (k, v): (k.name, v)),
+ equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
+
+ p.run()
class SnippetsTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index a4c983f..013796c 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.transforms import CombineGlobally
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
from apache_beam.transforms import Map
@@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase):
pipeline.run()
+ def test_aggregator_empty_input(self):
+ actual = [] | CombineGlobally(max).without_defaults()
+ self.assertEqual(actual, [])
+
def test_pipeline_as_context(self):
def raise_exception(exn):
raise exn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2e5fe74..1afd486 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -125,7 +125,7 @@ class BufferingInMemoryCache(object):
for key, value in self._cache.iteritems():
applied_ptransform, tag = key
self._pvalue_cache.cache_output(applied_ptransform, tag, value)
- self._cache = None
+ self._cache = None
class DirectPipelineResult(PipelineResult):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 0f1c53b..378aecf 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -202,7 +202,7 @@ class TransformExecutorServices(object):
if not cached:
cached = SerialEvaluationState(self._executor_service, self._scheduled)
self._serial_cache[step] = cached
- return cached
+ return cached
@property
def executors(self):
@@ -480,18 +480,20 @@ class _ExecutorServiceParallelExecutor(object):
Otherwise monitor task should schedule itself again for future
execution.
"""
- if self._executor.evaluation_context.is_done():
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
- self._executor.executor_service.shutdown()
- return True
- elif not self._is_executing:
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
- Exception('Monitor task detected a pipeline stall.')))
+ if self._is_executing():
+ # There are some bundles still in progress.
+ return False
+ else:
+ if self._executor.evaluation_context.is_done():
+ self._executor.visible_updates.offer(
+ _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
+ else:
+ # Nothing is scheduled for execution, but watermarks incomplete.
+ self._executor.visible_updates.offer(
+ _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+ Exception('Monitor task detected a pipeline stall.')))
self._executor.executor_service.shutdown()
return True
- return False
def _fire_timers(self):
"""Schedules triggered consumers if any timers fired.
@@ -515,8 +517,13 @@ class _ExecutorServiceParallelExecutor(object):
def _is_executing(self):
"""Returns True if there is at least one non-blocked TransformExecutor."""
- for transform_executor in (
- self._executor.transform_executor_services.executors):
+ executors = self._executor.transform_executor_services.executors
+ if not executors:
+ # Nothing is executing.
+ return False
+
+ # Ensure that at least one of those executors is not blocked.
+ for transform_executor in executors:
if not transform_executor.blocked:
return True
return False
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index c732d7f..093f183 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -20,10 +20,10 @@
from __future__ import absolute_import
import collections
-import copy
from apache_beam import coders
from apache_beam import pvalue
+from apache_beam.internal import pickler
import apache_beam.io as io
from apache_beam.runners.common import DoFnRunner
from apache_beam.runners.common import DoFnState
@@ -338,7 +338,8 @@ class _ParDoEvaluator(_TransformEvaluator):
self._counter_factory = counters.CounterFactory()
- dofn = copy.deepcopy(transform.dofn)
+ # TODO(aaltay): Consider storing the serialized form as an optimization.
+ dofn = pickler.loads(pickler.dumps(transform.dofn))
pipeline_options = self._evaluation_context.pipeline_options
if (pipeline_options is not None
@@ -504,7 +505,8 @@ class _NativeWriteEvaluator(_TransformEvaluator):
side_inputs)
assert applied_ptransform.transform.sink
- self._sink = copy.deepcopy(applied_ptransform.transform.sink)
+ # TODO(aaltay): Consider storing the serialized form as an optimization.
+ self._sink = pickler.loads(pickler.dumps(applied_ptransform.transform.sink))
@property
def _is_final_bundle(self):