You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/04 22:40:50 UTC

[GitHub] [beam] rezarokni commented on a change in pull request #15585: Add dead letter pattern to Beam Python DoFns.

rezarokni commented on a change in pull request #15585:
URL: https://github.com/apache/beam/pull/15585#discussion_r721758809



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(

Review comment:
       suggest: with_exception_handling deadletter is common term, but may not translate well non english speakers.

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This

Review comment:
       'that can crash the calling process...' which would make pipeline fail or continuously throw errors dependent on runner and mode. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result

Review comment:
       partial within the window? Might want to mention behaviour batch / stream

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This
+          allows one to recover from errors that crash the process (e.g. from
+          an underlying C/C++ library), but is slower as elements and results

Review comment:
       library throwing a segfault. 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1733,6 +1800,187 @@ def FlatMapTuple(fn, *args, **kwargs):  # pylint: disable=invalid-name
   return pardo
 
 
+class _DeadLetterPattern(ptransform.PTransform):
+  def __init__(
+      self,
+      fn,
+      args,
+      kwargs,
+      main_tag,
+      dead_letter_tag,
+      exc_class,
+      partial,
+      use_subprocess,
+      threshold,
+      threshold_windowing):
+    if partial and use_subprocess:
+      raise ValueError('partial and use_subprocess are mutually incompatible.')
+    self._fn = fn
+    self._args = args
+    self._kwargs = kwargs
+    self._main_tag = main_tag
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+    self._use_subprocess = use_subprocess
+    self._threshold = threshold
+    self._threshold_windowing = threshold_windowing
+
+  def expand(self, pcoll):
+    result = pcoll | ParDo(
+        _DeadLetterDoFn(
+            _SubprocessDoFn(self._fn) if self._use_subprocess else self._fn,
+            self._dead_letter_tag,
+            self._exc_class,
+            self._partial),
+        *self._args,
+        **self._kwargs).with_outputs(
+            self._dead_letter_tag, main=self._main_tag, allow_unknown_tags=True)
+
+    if self._threshold < 1.0:
+
+      class MaybeWindow(ptransform.PTransform):
+        @staticmethod
+        def expand(pcoll):
+          if self._threshold_windowing:
+            return pcoll | WindowInto(self._threshold_windowing)
+          else:
+            return pcoll
+
+      input_count_view = pcoll | 'CountTotal' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).as_singleton_view())
+      bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).without_defaults())
+
+      def check_threshold(bad, total, threshold):
+        if bad > total * threshold:
+          raise ValueError(
+              "Too many bad elements: %s / %s = %s > %s" %

Review comment:
       The number of elements with exceptions is exceeded value of threshold, within the window...

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This

Review comment:
       Would be good to have a few lines on what this looks like to actually implement, ' wrap the c++ .. etc..

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1222,6 +1225,67 @@ def __init__(self, fn, *args, **kwargs):
     from apache_beam.runners.common import DoFnSignature
     self._signature = DoFnSignature(self.fn)
 
+  def with_dead_letters(
+      self,
+      main_tag='good',
+      dead_letter_tag='bad',
+      *,
+      exc_class=Exception,
+      partial=False,
+      use_subprocess=False,
+      threshold=.999,
+      threshold_windowing=None):
+    """Automatically provides a dead letter output for skipping bad records.
+
+    This returns a tagged output with two PCollections, the first being the
+    results of successfully processing the input PCollection, and the second
+    being the set of bad records (those which threw exceptions during
+    processing) along with information about the errors raised.
+
+    For example, one would write::
+
+        good, bad = Map(maybe_error_raising_function).with_dead_letters()
+
+    and `good` will be a PCollection of mapped records and `bad` will contain
+    those that raised exceptions.
+
+
+    Args:
+      main_tag: tag to be used for the main (good) output of the DoFn,
+          useful to avoid possible conflicts if this DoFn already produces
+          multiple outputs.  Optional, defaults to 'good'.
+      dead_letter_tag: tag to be used for the bad records, useful to avoid
+          possible conflicts if this DoFn already produces multiple outputs.
+          Optional, defaults to 'bad'.
+      exc_class: An exception class, or tuple of exception classes, to catch.
+          Optional, defaults to 'Exception'.
+      partial: Whether to emit outputs as they're produced (which could result
+          in partial outputs for a ParDo or FlatMap that throws an error part
+          way through execution) or buffer all outputs until successful
+          processing of the entire element. Optional, defaults to False.
+      use_subprocess: Whether to execute the DoFn logic in a subprocess. This
+          allows one to recover from errors that crash the process (e.g. from
+          an underlying C/C++ library), but is slower as elements and results

Review comment:
       Maybe mention that if using sub_process think about using group into batches as a transform before this one.

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1733,6 +1800,187 @@ def FlatMapTuple(fn, *args, **kwargs):  # pylint: disable=invalid-name
   return pardo
 
 
+class _DeadLetterPattern(ptransform.PTransform):
+  def __init__(
+      self,
+      fn,
+      args,
+      kwargs,
+      main_tag,
+      dead_letter_tag,
+      exc_class,
+      partial,
+      use_subprocess,
+      threshold,
+      threshold_windowing):
+    if partial and use_subprocess:
+      raise ValueError('partial and use_subprocess are mutually incompatible.')
+    self._fn = fn
+    self._args = args
+    self._kwargs = kwargs
+    self._main_tag = main_tag
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+    self._use_subprocess = use_subprocess
+    self._threshold = threshold
+    self._threshold_windowing = threshold_windowing
+
+  def expand(self, pcoll):
+    result = pcoll | ParDo(
+        _DeadLetterDoFn(
+            _SubprocessDoFn(self._fn) if self._use_subprocess else self._fn,
+            self._dead_letter_tag,
+            self._exc_class,
+            self._partial),
+        *self._args,
+        **self._kwargs).with_outputs(
+            self._dead_letter_tag, main=self._main_tag, allow_unknown_tags=True)
+
+    if self._threshold < 1.0:
+
+      class MaybeWindow(ptransform.PTransform):
+        @staticmethod
+        def expand(pcoll):
+          if self._threshold_windowing:
+            return pcoll | WindowInto(self._threshold_windowing)
+          else:
+            return pcoll
+
+      input_count_view = pcoll | 'CountTotal' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).as_singleton_view())
+      bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
+          MaybeWindow() | Map(lambda _: 1)
+          | CombineGlobally(sum).without_defaults())
+
+      def check_threshold(bad, total, threshold):
+        if bad > total * threshold:
+          raise ValueError(
+              "Too many bad elements: %s / %s = %s > %s" %
+              (bad, total, bad / total, threshold))
+
+      _ = bad_count_pcoll | Map(
+          check_threshold, input_count_view, self._threshold)
+
+    return result
+
+
+class _DeadLetterDoFn(DoFn):
+  """Implementation of ParDo.with_dead_letters."""
+  def __init__(self, fn, dead_letter_tag, exc_class, partial):
+    self._fn = fn
+    self._dead_letter_tag = dead_letter_tag
+    self._exc_class = exc_class
+    self._partial = partial
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in _DeadLetterDoFn.__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    try:
+      result = self._fn.process(*args, **kwargs)
+      if not self._partial:
+        # Don't emit any results until we know there will be no errors.
+        result = list(result)
+      yield from result
+    except self._exc_class as exn:
+      yield pvalue.TaggedOutput(
+          self._dead_letter_tag,
+          (
+              args[0], (
+                  type(exn),
+                  repr(exn),
+                  traceback.format_exception(*sys.exc_info()))))
+
+
+class _SubprocessDoFn(DoFn):

Review comment:
       What would happen to the logs generated by the C++ process? In the java sample these are collected via handles. 

##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -2524,6 +2526,231 @@ def test_eager_execution_tagged_outputs(self):
       result.foo
 
 
+@parameterized_class([{'use_subprocess': False}, {'use_subprocess': True}])
+class DeadLettersTest(unittest.TestCase):
+  @classmethod
+  def die(cls, x):
+    if cls.use_subprocess and False:
+      os._exit(x)
+    else:
+      raise ValueError(x)
+
+  @classmethod
+  def die_if_negative(cls, x):
+    if x < 0:
+      cls.die(x)
+    else:
+      return x
+
+  @classmethod
+  def exception_if_negative(cls, x):
+    if x < 0:
+      raise ValueError(x)
+    else:
+      return x
+
+  @classmethod
+  def die_if_less(cls, x, bound=0):
+    if x < bound:
+      raise cls.die(x)
+    else:
+      return x, bound
+
+  def test_error_messages(self):
+    with TestPipeline() as p:
+      good, bad = (
+          p
+          | beam.Create([-1, 10, -100, 2, 0])
+          | beam.Map(self.exception_if_negative).with_dead_letters())
+      assert_that(good, equal_to([0, 2, 10]), label='CheckGood')
+      assert_that(
+          bad |
+          beam.MapTuple(lambda e, exc_info: (e, exc_info[1].replace(',', ''))),
+          equal_to([(-1, 'ValueError(-1)'), (-100, 'ValueError(-100)')]),
+          label='CheckBad')
+
+  def test_filters_exceptions(self):
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 10, -100, 2, 0])
+          | beam.Map(self.exception_if_negative).with_dead_letters(
+              use_subprocess=self.use_subprocess,
+              exc_class=(ValueError, TypeError)))
+      assert_that(good, equal_to([0, 2, 10]), label='CheckGood')
+
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        good, _ = (
+            p
+            | beam.Create([-1, 10, -100, 2, 0])
+            | beam.Map(self.die_if_negative).with_dead_letters(
+                use_subprocess=self.use_subprocess,
+                exc_class=TypeError))
+
+  def test_tuples(self):
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([(1, 2), (3, 2), (1, -10)])
+          | beam.MapTuple(self.die_if_less).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to([(3, 2), (1, -10)]), label='CheckGood')
+
+  def test_side_inputs(self):
+
+    with TestPipeline() as p:
+      input = p | beam.Create([-1, 10, 100])
+
+      assert_that((
+          input
+          | 'Default' >> beam.Map(self.die_if_less).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(10, 0), (100, 0)]),
+                  label='CheckDefault')
+      assert_that((
+          input
+          | 'Pos' >> beam.Map(self.die_if_less, 20).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(100, 20)]),
+                  label='PosSideInput')
+      assert_that((
+          input
+          | 'Key' >> beam.Map(self.die_if_less, bound=30).with_dead_letters(
+              use_subprocess=self.use_subprocess)).good,
+                  equal_to([(100, 30)]),
+                  label='KeySideInput')
+
+  def test_multiple_outputs(self):
+    die = type(self).die
+
+    def die_on_negative_even_odd(x):
+      if x < 0:
+        die(x)
+      elif x % 2 == 0:
+        return pvalue.TaggedOutput('even', x)
+      elif x % 2 == 1:
+        return pvalue.TaggedOutput('odd', x)
+
+    with TestPipeline() as p:
+      results = (
+          p
+          | beam.Create([1, -1, 2, -2, 3])
+          | beam.Map(die_on_negative_even_odd).with_dead_letters())
+      assert_that(results.even, equal_to([2]), label='CheckEven')
+      assert_that(results.odd, equal_to([1, 3]), label='CheckOdd')
+
+  def test_params(self):
+    die = type(self).die
+
+    def die_if_negative_with_timestamp(x, ts=beam.DoFn.TimestampParam):
+      if x < 0:
+        raise die(x)
+      else:
+        return x, ts
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 0, 1])
+          | beam.Map(lambda x: TimestampedValue(x, x))
+          | beam.Map(die_if_negative_with_timestamp).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to([(0, Timestamp(0)), (1, Timestamp(1))]))
+
+  def test_lifecycle(self):
+    die = type(self).die
+
+    class MyDoFn(beam.DoFn):
+      state = None
+
+      def setup(self):
+        assert self.state is None
+        self.state = 'setup'
+
+      def start_bundle(self):
+        assert self.state in ('setup', 'finish_bundle'), self.state
+        self.state = 'start_bundle'
+
+      def finish_bundle(self):
+        assert self.state in ('start_bundle', ), self.state
+        self.state = 'finish_bundle'
+
+      def teardown(self):
+        assert self.state in ('setup', 'finish_bundle'), self.state
+        self.state = 'teardown'
+
+      def process(self, x):
+        if x < 0:
+          raise die(x)
+        else:
+          yield self.state
+
+    with TestPipeline() as p:
+      good, _ = (
+          p
+          | beam.Create([-1, 0, 1, -10, 10])
+          | beam.ParDo(MyDoFn()).with_dead_letters(
+              use_subprocess=self.use_subprocess))
+      assert_that(good, equal_to(['start_bundle'] * 3))
+
+  def test_partial(self):
+    if self.use_subprocess:
+      self.skipTest('Subprocess and partial mutally exclusive.')
+
+    def die_if_negative_iter(elements):
+      for element in elements:
+        if element < 0:
+          raise ValueError(element)
+        yield element
+
+    with TestPipeline() as p:
+      input = p | beam.Create([(-1, 1, 11), (2, -2, 22), (3, 33, -3), (4, 44)])
+
+      assert_that((
+          input
+          | 'Partial' >> beam.FlatMap(die_if_negative_iter).with_dead_letters(
+              partial=True)).good,
+                  equal_to([2, 3, 33, 4, 44]),
+                  'CheckPartial')
+
+      assert_that((
+          input
+          | 'Complete' >> beam.FlatMap(die_if_negative_iter).with_dead_letters(
+              partial=False)).good,
+                  equal_to([4, 44]),
+                  'CheckComplete')
+
+  def test_threshold(self):
+    # The threshold is high enough.
+    with TestPipeline() as p:
+      _ = (
+          p
+          | beam.Create([-1, -2, 0, 1, 2, 3, 4, 5])
+          | beam.Map(self.die_if_negative).with_dead_letters(threshold=0.5))

Review comment:
       terminate_if_negative




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org