You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/13 19:36:49 UTC
[1/2] beam git commit: Integrate bundle retry code for the
DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 2b4a6b5d5 -> 2df25db34
Integrate bundle retry code for the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a183f95
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a183f95
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a183f95
Branch: refs/heads/master
Commit: 9a183f95d2a9f1d1dd4124da7d609b81a3b69d8e
Parents: 2b4a6b5
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Fri Nov 10 14:50:16 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Nov 13 11:36:27 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/options/pipeline_options.py | 7 -------
sdks/python/apache_beam/pipeline_test.py | 3 +--
sdks/python/apache_beam/runners/direct/executor.py | 14 +-------------
3 files changed, 2 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 5278b8a..aaac9a4 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -313,13 +313,6 @@ class DirectOptions(PipelineOptions):
help='DirectRunner uses stacked WindowedValues within a Bundle for '
'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
'avoid it.')
- parser.add_argument(
- '--direct_runner_bundle_retry',
- action='store_true',
- default=False,
- help=
- ('Whether to allow bundle retries. If True the maximum'
- 'number of attempts to process a bundle is 4. '))
class GoogleCloudOptions(PipelineOptions):
http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 9bbb0d7..567ab92 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -506,8 +506,7 @@ class RunnerApiTest(unittest.TestCase):
class DirectRunnerRetryTests(unittest.TestCase):
def test_retry_fork_graph(self):
- pipeline_options = PipelineOptions(['--direct_runner_bundle_retry'])
- p = beam.Pipeline(options=pipeline_options)
+ p = beam.Pipeline()
# TODO(mariagh): Remove the use of globals from the test.
global count_b, count_c # pylint: disable=global-variable-undefined
http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 51fe908..853f19f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -30,7 +30,6 @@ from weakref import WeakValueDictionary
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.options.pipeline_options import DirectOptions
class _ExecutorService(object):
@@ -278,13 +277,7 @@ class TransformExecutor(_ExecutorService.CallableTask):
self.blocked = False
self._call_count = 0
self._retry_count = 0
- # Switch to turn on/off the retry of bundles.
- pipeline_options = self._evaluation_context.pipeline_options
- # TODO(mariagh): Remove once "bundle retry" is no longer experimental.
- if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry:
- self._max_retries_per_bundle = 1
- else:
- self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
+ self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
def call(self):
self._call_count += 1
@@ -319,11 +312,6 @@ class TransformExecutor(_ExecutorService.CallableTask):
if self._retry_count == self._max_retries_per_bundle:
logging.error('Giving up after %s attempts.',
self._max_retries_per_bundle)
- if self._retry_count == 1:
- logging.info(
- 'Use the experimental flag --direct_runner_bundle_retry'
- ' to retry failed bundles (up to %d times).',
- TransformExecutor._MAX_RETRY_PER_BUNDLE)
self._completion_callback.handle_exception(self, e)
self._evaluation_context.metrics().commit_physical(
[2/2] beam git commit: This closes #4113
Posted by ch...@apache.org.
This closes #4113
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2df25db3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2df25db3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2df25db3
Branch: refs/heads/master
Commit: 2df25db3408bd6f4b869b12808b59847f381c116
Parents: 2b4a6b5 9a183f9
Author: chamikara@google.com <ch...@google.com>
Authored: Mon Nov 13 11:36:35 2017 -0800
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Nov 13 11:36:35 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/options/pipeline_options.py | 7 -------
sdks/python/apache_beam/pipeline_test.py | 3 +--
sdks/python/apache_beam/runners/direct/executor.py | 14 +-------------
3 files changed, 2 insertions(+), 22 deletions(-)
----------------------------------------------------------------------