You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/13 01:06:33 UTC
[1/2] beam git commit: Add initial bundle retry code
Repository: beam
Updated Branches:
refs/heads/master 84a23793c -> b0b642182
Add initial bundle retry code
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f2dddda
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f2dddda
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f2dddda
Branch: refs/heads/master
Commit: 1f2ddddabf541b88f01b17aa9a549081a8607bb9
Parents: 84a2379
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Thu Aug 3 00:16:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 12 18:06:08 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 7 ++
sdks/python/apache_beam/pipeline_test.py | 30 ++++++
.../apache_beam/runners/direct/executor.py | 100 ++++++++++++-------
3 files changed, 100 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index ea996a3..db65b3c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -314,6 +314,13 @@ class DirectOptions(PipelineOptions):
help='DirectRunner uses stacked WindowedValues within a Bundle for '
'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
'avoid it.')
+ parser.add_argument(
+ '--direct_runner_bundle_retry',
+ action='store_true',
+ default=False,
+ help=
+ ('Whether to allow bundle retries. If True the maximum'
+ 'number of attempts to process a bundle is 4. '))
class GoogleCloudOptions(PipelineOptions):
http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index aad0143..b3ac100 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -499,6 +499,36 @@ class RunnerApiTest(unittest.TestCase):
self.assertEqual(MyPTransform.pickle_count[0], 20)
+class DirectRunnerRetryTests(unittest.TestCase):
+
+ def test_retry_fork_graph(self):
+ pipeline_options = PipelineOptions(['--direct_runner_bundle_retry'])
+ p = beam.Pipeline(options=pipeline_options)
+
+ # TODO(mariagh): Remove the use of globals from the test.
+ global count_b, count_c # pylint: disable=global-variable-undefined
+ count_b, count_c = 0, 0
+
+ def f_b(x):
+ global count_b # pylint: disable=global-variable-undefined
+ count_b += 1
+ raise Exception('exception in f_b')
+
+ def f_c(x):
+ global count_c # pylint: disable=global-variable-undefined
+ count_c += 1
+ raise Exception('exception in f_c')
+
+ names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe'])
+
+ fork_b = names | 'SendToB' >> beam.Map(f_b) # pylint: disable=unused-variable
+ fork_c = names | 'SendToC' >> beam.Map(f_c) # pylint: disable=unused-variable
+
+ with self.assertRaises(Exception):
+ p.run().wait_until_finish()
+ assert count_b == count_c == 4
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index e70e326..2e46978 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -25,10 +25,12 @@ import logging
import Queue
import sys
import threading
+import traceback
from weakref import WeakValueDictionary
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.options.pipeline_options import DirectOptions
class _ExecutorService(object):
@@ -271,6 +273,15 @@ class TransformExecutor(_ExecutorService.CallableTask):
self._side_input_values = {}
self.blocked = False
self._call_count = 0
+ self._retry_count = 0
+ # Switch to turn on/off the retry of bundles.
+ pipeline_options = self._evaluation_context.pipeline_options
+ if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry:
+ self._max_retries_per_bundle = 1
+ else:
+ self._max_retries_per_bundle = 4
+ # TODO(mariagh): make _max_retries_per_bundle a constant
+ # once "bundle retry" is no longer experimental.
def call(self):
self._call_count += 1
@@ -288,47 +299,62 @@ class TransformExecutor(_ExecutorService.CallableTask):
# available.
return
self._side_input_values[side_input] = value
-
side_input_values = [self._side_input_values[side_input]
for side_input in self._applied_ptransform.side_inputs]
- try:
- evaluator = self._transform_evaluator_registry.get_evaluator(
- self._applied_ptransform, self._input_bundle,
- side_input_values, scoped_metrics_container)
-
- if self._fired_timers:
- for timer_firing in self._fired_timers:
- evaluator.process_timer_wrapper(timer_firing)
-
- if self._input_bundle:
- for value in self._input_bundle.get_elements_iterable():
- evaluator.process_element(value)
-
- with scoped_metrics_container:
- result = evaluator.finish_bundle()
- result.logical_metric_updates = metrics_container.get_cumulative()
-
- if self._evaluation_context.has_cache:
- for uncommitted_bundle in result.uncommitted_output_bundles:
+ while self._retry_count < self._max_retries_per_bundle:
+ try:
+ self.attempt_call(metrics_container,
+ scoped_metrics_container,
+ side_input_values)
+ break
+ except Exception as e:
+ self._retry_count += 1
+ logging.info(
+ 'Exception at bundle %r, due to an exception: %s',
+ self._input_bundle, traceback.format_exc())
+ if self._retry_count == self._max_retries_per_bundle:
+ logging.error('Giving up after %s attempts.',
+ self._max_retries_per_bundle)
+ self._completion_callback.handle_exception(self, e)
+
+ self._evaluation_context.metrics().commit_physical(
+ self._input_bundle,
+ metrics_container.get_cumulative())
+ self._transform_evaluation_state.complete(self)
+
+ def attempt_call(self, metrics_container,
+ scoped_metrics_container,
+ side_input_values):
+ evaluator = self._transform_evaluator_registry.get_evaluator(
+ self._applied_ptransform, self._input_bundle,
+ side_input_values, scoped_metrics_container)
+
+ if self._fired_timers:
+ for timer_firing in self._fired_timers:
+ evaluator.process_timer_wrapper(timer_firing)
+
+ if self._input_bundle:
+ for value in self._input_bundle.get_elements_iterable():
+ evaluator.process_element(value)
+
+ with scoped_metrics_container:
+ result = evaluator.finish_bundle()
+ result.logical_metric_updates = metrics_container.get_cumulative()
+
+ if self._evaluation_context.has_cache:
+ for uncommitted_bundle in result.uncommitted_output_bundles:
+ self._evaluation_context.append_to_cache(
+ self._applied_ptransform, uncommitted_bundle.tag,
+ uncommitted_bundle.get_elements_iterable())
+ undeclared_tag_values = result.undeclared_tag_values
+ if undeclared_tag_values:
+ for tag, value in undeclared_tag_values.iteritems():
self._evaluation_context.append_to_cache(
- self._applied_ptransform, uncommitted_bundle.tag,
- uncommitted_bundle.get_elements_iterable())
- undeclared_tag_values = result.undeclared_tag_values
- if undeclared_tag_values:
- for tag, value in undeclared_tag_values.iteritems():
- self._evaluation_context.append_to_cache(
- self._applied_ptransform, tag, value)
-
- self._completion_callback.handle_result(self, self._input_bundle, result)
- return result
- except Exception as e: # pylint: disable=broad-except
- self._completion_callback.handle_exception(self, e)
- finally:
- self._evaluation_context.metrics().commit_physical(
- self._input_bundle,
- metrics_container.get_cumulative())
- self._transform_evaluation_state.complete(self)
+ self._applied_ptransform, tag, value)
+
+ self._completion_callback.handle_result(self, self._input_bundle, result)
+ return result
class Executor(object):
[2/2] beam git commit: This closes #3679
Posted by al...@apache.org.
This closes #3679
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b64218
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b64218
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b64218
Branch: refs/heads/master
Commit: b0b64218257576ef20601fd31e075e4a8e297a60
Parents: 84a2379 1f2dddd
Author: Ahmet Altay <al...@google.com>
Authored: Sat Aug 12 18:06:21 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 12 18:06:21 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 7 ++
sdks/python/apache_beam/pipeline_test.py | 30 ++++++
.../apache_beam/runners/direct/executor.py | 100 ++++++++++++-------
3 files changed, 100 insertions(+), 37 deletions(-)
----------------------------------------------------------------------