You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/11/23 23:03:38 UTC

[beam] branch master updated: [BEAM-11070] Use self-checkpoint to enfore finalization happens.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b29102e  [BEAM-11070] Use self-checkpoint to enfore finalization happens.
     new 9b51d4b  Merge pull request #13338 from [BEAM-11070] Use self-checkpoint to enforce finalization happens.
b29102e is described below

commit b29102ed4c4186964cd281edf7851898535cfe02
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Nov 13 12:06:20 2020 -0800

    [BEAM-11070] Use self-checkpoint to enfore finalization happens.
---
 .../runners/portability/flink_runner_test.py       |  3 +-
 .../portability/fn_api_runner/fn_runner_test.py    | 81 ++++++++++------------
 2 files changed, 39 insertions(+), 45 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 303fe96..12ed1b8 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -442,7 +442,8 @@ class FlinkRunnerTestStreaming(FlinkRunnerTest):
     super(FlinkRunnerTest, self).test_callbacks_with_exception()
 
   def test_register_finalizations(self):
-    raise unittest.SkipTest("BEAM-11070")
+    self.enable_commit = True
+    super(FlinkRunnerTest, self).test_register_finalizations()
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 91343b5..18a30e9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -23,7 +23,6 @@ import collections
 import logging
 import os
 import random
-import shutil
 import sys
 import tempfile
 import threading
@@ -835,23 +834,41 @@ class FnApiRunnerTest(unittest.TestCase):
       assert_that(res, equal_to(['1', '2']))
 
   def test_register_finalizations(self):
-    event_recorder = EventRecorder(tempfile.gettempdir())
-    elements_list = ['2', '1']
+    class FinalizableSplittableDoFn(beam.DoFn):
+      was_finalized = False
+
+      def set_finalized(self):
+        self.was_finalized = True
 
-    class FinalizableDoFn(beam.DoFn):
       def process(
-          self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
-        bundle_finalizer.register(lambda: event_recorder.record(element))
-        yield element
+          self,
+          element,
+          bundle_finalizer=beam.DoFn.BundleFinalizerParam,
+          restriction_tracker=beam.DoFn.RestrictionParam(
+              OffsetRangeProvider(use_bounded_offset_range=True))):
+        # We use SDF to enforce finalization call happens by using
+        # self-initiated checkpoint.
+        if self.was_finalized:
+          restriction_tracker.try_claim(
+              restriction_tracker.current_restriction().start)
+          yield element
+          restriction_tracker.try_claim(element)
+          return
+        if restriction_tracker.try_claim(
+            restriction_tracker.current_restriction().start):
+          bundle_finalizer.register(lambda: self.set_finalized())
+          # We sleep here instead of setting a resume time since the resume time
+          # doesn't need to be honored.
+          time.sleep(1)
+          restriction_tracker.defer_remainder()
 
     with self.create_pipeline() as p:
-      res = (p | beam.Create(elements_list) | beam.ParDo(FinalizableDoFn()))
-
-      assert_that(res, equal_to(elements_list))
-
-    results = event_recorder.events()
-    event_recorder.cleanup()
-    self.assertEqual(results, sorted(elements_list))
+      max_retries = 100
+      res = (
+          p
+          | beam.Create([max_retries])
+          | beam.ParDo(FinalizableSplittableDoFn()))
+      assert_that(res, equal_to([max_retries]))
 
   def test_sdf_synthetic_source(self):
     common_attrs = {
@@ -1331,6 +1348,9 @@ class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
   def test_sdf_with_watermark_tracking(self):
     raise unittest.SkipTest("This test is for a single worker only.")
 
+  def test_register_finalizations(self):
+    raise unittest.SkipTest("This test is for a single worker only.")
+
 
 class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
   def create_pipeline(self, is_drain=False):
@@ -1355,6 +1375,9 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
   def test_sdf_with_watermark_tracking(self):
     raise unittest.SkipTest("This test is for a single worker only.")
 
+  def test_register_finalizations(self):
+    raise unittest.SkipTest("This test is for a single worker only.")
+
 
 class FnApiRunnerTestWithBundleRepeat(FnApiRunnerTest):
   def create_pipeline(self, is_drain=False):
@@ -1684,36 +1707,6 @@ def _unpickle_element_counter(name):
   return _pickled_element_counters[name]
 
 
-class EventRecorder(object):
-  """Used to be registered as a callback in bundle finalization.
-
-  The reason why records are written into a tmp file is, the in-memory dataset
-  cannot keep callback records when passing into one DoFn.
-  """
-  def __init__(self, tmp_dir):
-    self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex)
-    os.mkdir(self.tmp_dir)
-
-  def record(self, content):
-    file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt')
-    with open(file_path, 'w') as f:
-      f.write(content)
-
-  def events(self):
-    content = []
-    record_files = [
-        f for f in os.listdir(self.tmp_dir)
-        if os.path.isfile(os.path.join(self.tmp_dir, f))
-    ]
-    for file in record_files:
-      with open(os.path.join(self.tmp_dir, file), 'r') as f:
-        content.append(f.read())
-    return sorted(content)
-
-  def cleanup(self):
-    shutil.rmtree(self.tmp_dir)
-
-
 class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
   """A RestrictionProvider that used for sdf related tests."""
   def initial_restriction(self, element):