You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/13 01:06:33 UTC

[1/2] beam git commit: Add initial bundle retry code

Repository: beam
Updated Branches:
  refs/heads/master 84a23793c -> b0b642182


Add initial bundle retry code


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f2dddda
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f2dddda
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f2dddda

Branch: refs/heads/master
Commit: 1f2ddddabf541b88f01b17aa9a549081a8607bb9
Parents: 84a2379
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Thu Aug 3 00:16:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 12 18:06:08 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     |   7 ++
 sdks/python/apache_beam/pipeline_test.py        |  30 ++++++
 .../apache_beam/runners/direct/executor.py      | 100 ++++++++++++-------
 3 files changed, 100 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index ea996a3..db65b3c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -314,6 +314,13 @@ class DirectOptions(PipelineOptions):
         help='DirectRunner uses stacked WindowedValues within a Bundle for '
         'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
         'avoid it.')
+    parser.add_argument(
+        '--direct_runner_bundle_retry',
+        action='store_true',
+        default=False,
+        help=
+        ('Whether to allow bundle retries. If True the maximum'
+         'number of attempts to process a bundle is 4. '))
 
 
 class GoogleCloudOptions(PipelineOptions):

http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index aad0143..b3ac100 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -499,6 +499,36 @@ class RunnerApiTest(unittest.TestCase):
     self.assertEqual(MyPTransform.pickle_count[0], 20)
 
 
+class DirectRunnerRetryTests(unittest.TestCase):
+
+  def test_retry_fork_graph(self):
+    pipeline_options = PipelineOptions(['--direct_runner_bundle_retry'])
+    p = beam.Pipeline(options=pipeline_options)
+
+    # TODO(mariagh): Remove the use of globals from the test.
+    global count_b, count_c # pylint: disable=global-variable-undefined
+    count_b, count_c = 0, 0
+
+    def f_b(x):
+      global count_b  # pylint: disable=global-variable-undefined
+      count_b += 1
+      raise Exception('exception in f_b')
+
+    def f_c(x):
+      global count_c  # pylint: disable=global-variable-undefined
+      count_c += 1
+      raise Exception('exception in f_c')
+
+    names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe'])
+
+    fork_b = names | 'SendToB' >> beam.Map(f_b) # pylint: disable=unused-variable
+    fork_c = names | 'SendToC' >> beam.Map(f_c) # pylint: disable=unused-variable
+
+    with self.assertRaises(Exception):
+      p.run().wait_until_finish()
+    assert count_b == count_c == 4
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index e70e326..2e46978 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -25,10 +25,12 @@ import logging
 import Queue
 import sys
 import threading
+import traceback
 from weakref import WeakValueDictionary
 
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.options.pipeline_options import DirectOptions
 
 
 class _ExecutorService(object):
@@ -271,6 +273,15 @@ class TransformExecutor(_ExecutorService.CallableTask):
     self._side_input_values = {}
     self.blocked = False
     self._call_count = 0
+    self._retry_count = 0
+    # Switch to turn on/off the retry of bundles.
+    pipeline_options = self._evaluation_context.pipeline_options
+    if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry:
+      self._max_retries_per_bundle = 1
+    else:
+      self._max_retries_per_bundle = 4
+    # TODO(mariagh): make _max_retries_per_bundle a constant
+    # once "bundle retry" is no longer experimental.
 
   def call(self):
     self._call_count += 1
@@ -288,47 +299,62 @@ class TransformExecutor(_ExecutorService.CallableTask):
           # available.
           return
         self._side_input_values[side_input] = value
-
     side_input_values = [self._side_input_values[side_input]
                          for side_input in self._applied_ptransform.side_inputs]
 
-    try:
-      evaluator = self._transform_evaluator_registry.get_evaluator(
-          self._applied_ptransform, self._input_bundle,
-          side_input_values, scoped_metrics_container)
-
-      if self._fired_timers:
-        for timer_firing in self._fired_timers:
-          evaluator.process_timer_wrapper(timer_firing)
-
-      if self._input_bundle:
-        for value in self._input_bundle.get_elements_iterable():
-          evaluator.process_element(value)
-
-      with scoped_metrics_container:
-        result = evaluator.finish_bundle()
-        result.logical_metric_updates = metrics_container.get_cumulative()
-
-      if self._evaluation_context.has_cache:
-        for uncommitted_bundle in result.uncommitted_output_bundles:
+    while self._retry_count < self._max_retries_per_bundle:
+      try:
+        self.attempt_call(metrics_container,
+                          scoped_metrics_container,
+                          side_input_values)
+        break
+      except Exception as e:
+        self._retry_count += 1
+        logging.info(
+            'Exception at bundle %r, due to an exception: %s',
+            self._input_bundle, traceback.format_exc())
+        if self._retry_count == self._max_retries_per_bundle:
+          logging.error('Giving up after %s attempts.',
+                        self._max_retries_per_bundle)
+          self._completion_callback.handle_exception(self, e)
+
+    self._evaluation_context.metrics().commit_physical(
+        self._input_bundle,
+        metrics_container.get_cumulative())
+    self._transform_evaluation_state.complete(self)
+
+  def attempt_call(self, metrics_container,
+                   scoped_metrics_container,
+                   side_input_values):
+    evaluator = self._transform_evaluator_registry.get_evaluator(
+        self._applied_ptransform, self._input_bundle,
+        side_input_values, scoped_metrics_container)
+
+    if self._fired_timers:
+      for timer_firing in self._fired_timers:
+        evaluator.process_timer_wrapper(timer_firing)
+
+    if self._input_bundle:
+      for value in self._input_bundle.get_elements_iterable():
+        evaluator.process_element(value)
+
+    with scoped_metrics_container:
+      result = evaluator.finish_bundle()
+      result.logical_metric_updates = metrics_container.get_cumulative()
+
+    if self._evaluation_context.has_cache:
+      for uncommitted_bundle in result.uncommitted_output_bundles:
+        self._evaluation_context.append_to_cache(
+            self._applied_ptransform, uncommitted_bundle.tag,
+            uncommitted_bundle.get_elements_iterable())
+      undeclared_tag_values = result.undeclared_tag_values
+      if undeclared_tag_values:
+        for tag, value in undeclared_tag_values.iteritems():
           self._evaluation_context.append_to_cache(
-              self._applied_ptransform, uncommitted_bundle.tag,
-              uncommitted_bundle.get_elements_iterable())
-        undeclared_tag_values = result.undeclared_tag_values
-        if undeclared_tag_values:
-          for tag, value in undeclared_tag_values.iteritems():
-            self._evaluation_context.append_to_cache(
-                self._applied_ptransform, tag, value)
-
-      self._completion_callback.handle_result(self, self._input_bundle, result)
-      return result
-    except Exception as e:  # pylint: disable=broad-except
-      self._completion_callback.handle_exception(self, e)
-    finally:
-      self._evaluation_context.metrics().commit_physical(
-          self._input_bundle,
-          metrics_container.get_cumulative())
-      self._transform_evaluation_state.complete(self)
+              self._applied_ptransform, tag, value)
+
+    self._completion_callback.handle_result(self, self._input_bundle, result)
+    return result
 
 
 class Executor(object):


[2/2] beam git commit: This closes #3679

Posted by al...@apache.org.
This closes #3679


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0b64218
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0b64218
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0b64218

Branch: refs/heads/master
Commit: b0b64218257576ef20601fd31e075e4a8e297a60
Parents: 84a2379 1f2dddd
Author: Ahmet Altay <al...@google.com>
Authored: Sat Aug 12 18:06:21 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sat Aug 12 18:06:21 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     |   7 ++
 sdks/python/apache_beam/pipeline_test.py        |  30 ++++++
 .../apache_beam/runners/direct/executor.py      | 100 ++++++++++++-------
 3 files changed, 100 insertions(+), 37 deletions(-)
----------------------------------------------------------------------