You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/05 00:43:32 UTC

[GitHub] [beam] robertwb commented on a change in pull request #15585: Add dead letter pattern to Beam Python DoFns.

robertwb commented on a change in pull request #15585:
URL: https://github.com/apache/beam/pull/15585#discussion_r721807825



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1733,6 +1800,187 @@ def FlatMapTuple(fn, *args, **kwargs):  # pylint: disable=invalid-name
   return pardo
 
 
+class _DeadLetterPattern(ptransform.PTransform):
+  def __init__(
+      self,
+      fn,
+      args,
+      kwargs,
+      main_tag,
+      dead_letter_tag,
+      exc_class,
+      partial,
+      use_subprocess,
+      threshold,
+      threshold_windowing):
+    if partial and use_subprocess:
+      raise ValueError('partial and use_subprocess are mutually incompatible.')
+    self._fn = fn
+    self._args = args
+    self._kwargs = kwargs
+    self._main_tag = main_tag
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+    self._use_subprocess = use_subprocess
+    self._threshold = threshold
+    self._threshold_windowing = threshold_windowing
+
+  def expand(self, pcoll):
+    result = pcoll | ParDo(
+        _DeadLetterDoFn(
+            _SubprocessDoFn(self._fn) if self._use_subprocess else self._fn,
+            self._dead_letter_tag,
+            self._exc_class,
+            self._partial),
+        *self._args,
+        **self._kwargs).with_outputs(
+            self._dead_letter_tag, main=self._main_tag, allow_unknown_tags=True)
+
+    if self._threshold < 1.0:
+
+      class MaybeWindow(ptransform.PTransform):
+        @staticmethod
+        def expand(pcoll):
+          if self._threshold_windowing:
+            return pcoll | WindowInto(self._threshold_windowing)
+          else:
+            return pcoll
+
+      input_count_view = pcoll | 'CountTotal' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).as_singleton_view())
+      bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).without_defaults())
+
+      def check_threshold(bad, total, threshold):
+        if bad > total * threshold:
+          raise ValueError(
+              "Too many bad elements: %s / %s = %s > %s" %

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This

Review comment:
       I think that's out of the scope of this docstring. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This

Review comment:
       This isn't really about use_subprocess, as a normal exception will do that as well, but commented on this in the main description. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This
+          allows one to recover from errors that crash the process (e.g. from
+          an underlying C/C++ library), but is slower as elements and results

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result

Review comment:
       This is not a batch/streaming thing, but for each element. Clarified. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(

Review comment:
       Good point. Done.

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This
+          allows one to recover from errors that crash the process (e.g. from
+          an underlying C/C++ library), but is slower as elements and results

Review comment:
       The issue with batching is that it increases the size of the failure domain. Future work could be to providing batching that automatically isolates the actual bad records (e.g. via binary search), likely in conjunction with the vectorized APIs that are being investigated. 
   
   Note that this starts up a long-running process that is used to handle all the elements (until failure, which should be rare) rather than a new process per element, so the overhead should be minimal (and can be amortized if there's any per-process or per-bundle initialization that needs to be done). The primary overhead is in serializing and sending the elements, which batching would not eliminate. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1733,6 +1800,187 @@ def FlatMapTuple(fn, *args, **kwargs):  # pylint: disable=invalid-name
   return pardo
 
 
+class _DeadLetterPattern(ptransform.PTransform):
+  def __init__(
+      self,
+      fn,
+      args,
+      kwargs,
+      main_tag,
+      dead_letter_tag,
+      exc_class,
+      partial,
+      use_subprocess,
+      threshold,
+      threshold_windowing):
+    if partial and use_subprocess:
+      raise ValueError('partial and use_subprocess are mutually incompatible.')
+    self._fn = fn
+    self._args = args
+    self._kwargs = kwargs
+    self._main_tag = main_tag
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+    self._use_subprocess = use_subprocess
+    self._threshold = threshold
+    self._threshold_windowing = threshold_windowing
+
+  def expand(self, pcoll):
+    result = pcoll | ParDo(
+        _DeadLetterDoFn(
+            _SubprocessDoFn(self._fn) if self._use_subprocess else self._fn,
+            self._dead_letter_tag,
+            self._exc_class,
+            self._partial),
+        *self._args,
+        **self._kwargs).with_outputs(
+            self._dead_letter_tag, main=self._main_tag, allow_unknown_tags=True)
+
+    if self._threshold < 1.0:
+
+      class MaybeWindow(ptransform.PTransform):
+        @staticmethod
+        def expand(pcoll):
+          if self._threshold_windowing:
+            return pcoll | WindowInto(self._threshold_windowing)
+          else:
+            return pcoll
+
+      input_count_view = pcoll | 'CountTotal' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).as_singleton_view())
+      bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).without_defaults())
+
+      def check_threshold(bad, total, threshold):
+        if bad > total * threshold:
+          raise ValueError(
+              "Too many bad elements: %s / %s = %s > %s" %
+              (bad, total, bad / total, threshold))
+
+      _ = bad_count_pcoll | Map(
+          check_threshold, input_count_view, self._threshold)
+
+    return result
+
+
+class _DeadLetterDoFn(DoFn):
+  """Implementation of ParDo.with_dead_letters."""
+  def __init__(self, fn, dead_letter_tag, exc_class, partial):
+    self._fn = fn
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in _DeadLetterDoFn.__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    try:
+      result = self._fn.process(*args, **kwargs)
+      if not self._partial:
+        # Don't emit any results until we know there will be no errors.
+        result = list(result)
+      yield from result
+    except self._exc_class as exn:
+      yield pvalue.TaggedOutput(
+          self._dead_letter_tag,
+          (
+              args[0], (
+                  type(exn),
+                  repr(exn),
+                  traceback.format_exception(*sys.exc_info()))))
+
+
+class _SubprocessDoFn(DoFn):

Review comment:
       I have verified that the logs successfully make it into the cloud logging worker logs. 

##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -2524,6 +2526,231 @@ def test_eager_execution_tagged_outputs(self):
       result.foo
 
 
+@parameterized_class([{'use_subprocess': False}, {'use_subprocess': True}])
+class DeadLettersTest(unittest.TestCase):
+  @classmethod
+  def die(cls, x):
+    if cls.use_subprocess and False:
+      os._exit(x)
+    else:
+      raise ValueError(x)
+
+  @classmethod
+  def die_if_negative(cls, x):
+    if x < 0:
+      cls.die(x)
+    else:
+      return x
+
+  @classmethod
+  def exception_if_negative(cls, x):
+    if x < 0:
+      raise ValueError(x)
+    else:
+      return x
+
+  @classmethod
+  def die_if_less(cls, x, bound=0):
+    if x < bound:
+      raise cls.die(x)
+    else:
+      return x, bound
+
+  def test_error_messages(self):
+    with TestPipeline() as p:
+      good, bad = (
+          p
+          | beam.Create([-1, 10, -100, 2, 0])
+          | beam.Map(self.exception_if_negative).with_dead_letters())
+      assert_that(good, equal_to([0, 2, 10]), label='CheckGood')
+      assert_that(
+          bad |
+          beam.MapTuple(lambda e, exc_info: (e, exc_info[1].replace(',', ''))),
+          equal_to([(-1, 'ValueError(-1)'), (-100, 'ValueError(-100)')]),
+          label='CheckBad')
+
+  def test_filters_exceptions(self):
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 10, -100, 2, 0])
+          | beam.Map(self.exception_if_negative).with_dead_letters(
+              use_subprocess=self.use_subprocess,
+              exc_class=(ValueError, TypeError)))
+      assert_that(good, equal_to([0, 2, 10]), label='CheckGood')
+
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        good, _ = (
+            p
+            | beam.Create([-1, 10, -100, 2, 0])
+            | beam.Map(self.die_if_negative).with_dead_letters(
+                use_subprocess=self.use_subprocess,
+                exc_class=TypeError))
+
+  def test_tuples(self):
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([(1, 2), (3, 2), (1, -10)])
+          | beam.MapTuple(self.die_if_less).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to([(3, 2), (1, -10)]), label='CheckGood')
+
+  def test_side_inputs(self):
+
+    with TestPipeline() as p:
+      input = p | beam.Create([-1, 10, 100])
+
+      assert_that((
+          input
+          | 'Default' >> beam.Map(self.die_if_less).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(10, 0), (100, 0)]),
+                  label='CheckDefault')
+      assert_that((
+          input
+          | 'Pos' >> beam.Map(self.die_if_less, 20).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(100, 20)]),
+                  label='PosSideInput')
+      assert_that((
+          input
+          | 'Key' >> beam.Map(self.die_if_less, bound=30).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(100, 30)]),
+                  label='KeySideInput')
+
+  def test_multiple_outputs(self):
+    die = type(self).die
+
+    def die_on_negative_even_odd(x):
+      if x < 0:
+        die(x)
+      elif x % 2 == 0:
+        return pvalue.TaggedOutput('even', x)
+      elif x % 2 == 1:
+        return pvalue.TaggedOutput('odd', x)
+
+    with TestPipeline() as p:
+      results = (
+          p
+          | beam.Create([1, -1, 2, -2, 3])
+          | beam.Map(die_on_negative_even_odd).with_dead_letters())
+      assert_that(results.even, equal_to([2]), label='CheckEven')
+      assert_that(results.odd, equal_to([1, 3]), label='CheckOdd')
+
+  def test_params(self):
+    die = type(self).die
+
+    def die_if_negative_with_timestamp(x, ts=beam.DoFn.TimestampParam):
+      if x < 0:
+        raise die(x)
+      else:
+        return x, ts
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 0, 1])
+          | beam.Map(lambda x: TimestampedValue(x, x))
+          | beam.Map(die_if_negative_with_timestamp).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to([(0, Timestamp(0)), (1, Timestamp(1))]))
+
+  def test_lifecycle(self):
+    die = type(self).die
+
+    class MyDoFn(beam.DoFn):
+      state = None
+
+      def setup(self):
+        assert self.state is None
+        self.state = 'setup'
+
+      def start_bundle(self):
+        assert self.state in ('setup', 'finish_bundle'), self.state
+        self.state = 'start_bundle'
+
+      def finish_bundle(self):
+        assert self.state in ('start_bundle', ), self.state
+        self.state = 'finish_bundle'
+
+      def teardown(self):
+        assert self.state in ('setup', 'finish_bundle'), self.state
+        self.state = 'teardown'
+
+      def process(self, x):
+        if x < 0:
+          raise die(x)
+        else:
+          yield self.state
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 0, 1, -10, 10])
+          | beam.ParDo(MyDoFn()).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to(['start_bundle'] * 3))
+
+  def test_partial(self):
+    if self.use_subprocess:
+      self.skipTest('Subprocess and partial mutally exclusive.')
+
+    def die_if_negative_iter(elements):
+      for element in elements:
+        if element < 0:
+          raise ValueError(element)
+        yield element
+
+    with TestPipeline() as p:
+      input = p | beam.Create([(-1, 1, 11), (2, -2, 22), (3, 33, -3), (4, 44)])
+
+      assert_that((
+          input
+          | 'Partial' >> beam.FlatMap(die_if_negative_iter).with_dead_letters(
+              partial=True)).good,
+                  equal_to([2, 3, 33, 4, 44]),
+                  'CheckPartial')
+
+      assert_that((
+          input
+          | 'Complete' >> beam.FlatMap(die_if_negative_iter).with_dead_letters(
+              partial=False)).good,
+                  equal_to([4, 44]),
+                  'CheckComplete')
+
+  def test_threshold(self):
+    # The threshold is high enough.
+    with TestPipeline() as p:
+      _ = (
+          p
+          | beam.Create([-1, -2, 0, 1, 2, 3, 4, 5])
+          | beam.Map(self.die_if_negative).with_dead_letters(threshold=0.5))

Review comment:
       I think `die` is clear enough, and prefer it as it's shorter (less wrapping). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org