You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/11/23 23:03:38 UTC
[beam] branch master updated: [BEAM-11070] Use self-checkpoint to
enfore finalization happens.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b29102e [BEAM-11070] Use self-checkpoint to enfore finalization happens.
new 9b51d4b Merge pull request #13338 from [BEAM-11070] Use self-checkpoint to enforce finalization happens.
b29102e is described below
commit b29102ed4c4186964cd281edf7851898535cfe02
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Nov 13 12:06:20 2020 -0800
[BEAM-11070] Use self-checkpoint to enfore finalization happens.
---
.../runners/portability/flink_runner_test.py | 3 +-
.../portability/fn_api_runner/fn_runner_test.py | 81 ++++++++++------------
2 files changed, 39 insertions(+), 45 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 303fe96..12ed1b8 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -442,7 +442,8 @@ class FlinkRunnerTestStreaming(FlinkRunnerTest):
super(FlinkRunnerTest, self).test_callbacks_with_exception()
def test_register_finalizations(self):
- raise unittest.SkipTest("BEAM-11070")
+ self.enable_commit = True
+ super(FlinkRunnerTest, self).test_register_finalizations()
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 91343b5..18a30e9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -23,7 +23,6 @@ import collections
import logging
import os
import random
-import shutil
import sys
import tempfile
import threading
@@ -835,23 +834,41 @@ class FnApiRunnerTest(unittest.TestCase):
assert_that(res, equal_to(['1', '2']))
def test_register_finalizations(self):
- event_recorder = EventRecorder(tempfile.gettempdir())
- elements_list = ['2', '1']
+ class FinalizableSplittableDoFn(beam.DoFn):
+ was_finalized = False
+
+ def set_finalized(self):
+ self.was_finalized = True
- class FinalizableDoFn(beam.DoFn):
def process(
- self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
- bundle_finalizer.register(lambda: event_recorder.record(element))
- yield element
+ self,
+ element,
+ bundle_finalizer=beam.DoFn.BundleFinalizerParam,
+ restriction_tracker=beam.DoFn.RestrictionParam(
+ OffsetRangeProvider(use_bounded_offset_range=True))):
+ # We use SDF to enforce finalization call happens by using
+ # self-initiated checkpoint.
+ if self.was_finalized:
+ restriction_tracker.try_claim(
+ restriction_tracker.current_restriction().start)
+ yield element
+ restriction_tracker.try_claim(element)
+ return
+ if restriction_tracker.try_claim(
+ restriction_tracker.current_restriction().start):
+ bundle_finalizer.register(lambda: self.set_finalized())
+ # We sleep here instead of setting a resume time since the resume time
+ # doesn't need to be honored.
+ time.sleep(1)
+ restriction_tracker.defer_remainder()
with self.create_pipeline() as p:
- res = (p | beam.Create(elements_list) | beam.ParDo(FinalizableDoFn()))
-
- assert_that(res, equal_to(elements_list))
-
- results = event_recorder.events()
- event_recorder.cleanup()
- self.assertEqual(results, sorted(elements_list))
+ max_retries = 100
+ res = (
+ p
+ | beam.Create([max_retries])
+ | beam.ParDo(FinalizableSplittableDoFn()))
+ assert_that(res, equal_to([max_retries]))
def test_sdf_synthetic_source(self):
common_attrs = {
@@ -1331,6 +1348,9 @@ class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
def test_sdf_with_watermark_tracking(self):
raise unittest.SkipTest("This test is for a single worker only.")
+ def test_register_finalizations(self):
+ raise unittest.SkipTest("This test is for a single worker only.")
+
class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
def create_pipeline(self, is_drain=False):
@@ -1355,6 +1375,9 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
def test_sdf_with_watermark_tracking(self):
raise unittest.SkipTest("This test is for a single worker only.")
+ def test_register_finalizations(self):
+ raise unittest.SkipTest("This test is for a single worker only.")
+
class FnApiRunnerTestWithBundleRepeat(FnApiRunnerTest):
def create_pipeline(self, is_drain=False):
@@ -1684,36 +1707,6 @@ def _unpickle_element_counter(name):
return _pickled_element_counters[name]
-class EventRecorder(object):
- """Used to be registered as a callback in bundle finalization.
-
- The reason why records are written into a tmp file is, the in-memory dataset
- cannot keep callback records when passing into one DoFn.
- """
- def __init__(self, tmp_dir):
- self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex)
- os.mkdir(self.tmp_dir)
-
- def record(self, content):
- file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt')
- with open(file_path, 'w') as f:
- f.write(content)
-
- def events(self):
- content = []
- record_files = [
- f for f in os.listdir(self.tmp_dir)
- if os.path.isfile(os.path.join(self.tmp_dir, f))
- ]
- for file in record_files:
- with open(os.path.join(self.tmp_dir, file), 'r') as f:
- content.append(f.read())
- return sorted(content)
-
- def cleanup(self):
- shutil.rmtree(self.tmp_dir)
-
-
class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
"""A RestrictionProvider that used for sdf related tests."""
def initial_restriction(self, element):