You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/03 23:13:43 UTC
[beam] branch master updated: Use EventRecorder instead of relying
on class var.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e4f9054 Use EventRecorder instead of relying on class var.
new 46ac3bc Merge pull request #13478 from [BEAM-11070]Use EventRecorder instead of relying on class var.
e4f9054 is described below
commit e4f90544a881df2116bc32f7c42987e4f3e01c9d
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Dec 3 12:01:51 2020 -0800
Use EventRecorder instead of relying on class var.
---
.../portability/fn_api_runner/fn_runner_test.py | 59 ++++++++++++++++++----
1 file changed, 50 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 4f7fc27..05bd0d2 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -23,6 +23,7 @@ import collections
import logging
import os
import random
+import shutil
import sys
import tempfile
import threading
@@ -881,21 +882,19 @@ class FnApiRunnerTest(unittest.TestCase):
assert_that(res, equal_to(['1', '2']))
def test_register_finalizations(self):
- class FinalizableSplittableDoFn(beam.DoFn):
- was_finalized = False
-
- def set_finalized(self):
- self.was_finalized = True
+ event_recorder = EventRecorder(tempfile.gettempdir())
+ class FinalizableSplittableDoFn(beam.DoFn):
def process(
self,
element,
bundle_finalizer=beam.DoFn.BundleFinalizerParam,
restriction_tracker=beam.DoFn.RestrictionParam(
- OffsetRangeProvider(use_bounded_offset_range=True))):
+ OffsetRangeProvider(
+ use_bounded_offset_range=True, checkpoint_only=True))):
# We use SDF to enforce finalization call happens by using
# self-initiated checkpoint.
- if self.was_finalized:
+ if 'finalized' in event_recorder.events():
restriction_tracker.try_claim(
restriction_tracker.current_restriction().start)
yield element
@@ -903,7 +902,7 @@ class FnApiRunnerTest(unittest.TestCase):
return
if restriction_tracker.try_claim(
restriction_tracker.current_restriction().start):
- bundle_finalizer.register(lambda: self.set_finalized())
+ bundle_finalizer.register(lambda: event_recorder.record('finalized'))
# We sleep here instead of setting a resume time since the resume time
# doesn't need to be honored.
time.sleep(1)
@@ -917,6 +916,8 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.ParDo(FinalizableSplittableDoFn()))
assert_that(res, equal_to([max_retries]))
+ event_recorder.cleanup()
+
def test_sdf_synthetic_source(self):
common_attrs = {
'key_size': 1,
@@ -1763,6 +1764,36 @@ def _unpickle_element_counter(name):
return _pickled_element_counters[name]
+class EventRecorder(object):
+ """Used to be registered as a callback in bundle finalization.
+
+ The reason why records are written into a tmp file is, the in-memory dataset
+ cannot keep callback records when passing into one DoFn.
+ """
+ def __init__(self, tmp_dir):
+ self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex)
+ os.mkdir(self.tmp_dir)
+
+ def record(self, content):
+ file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt')
+ with open(file_path, 'w') as f:
+ f.write(content)
+
+ def events(self):
+ content = []
+ record_files = [
+ f for f in os.listdir(self.tmp_dir)
+ if os.path.isfile(os.path.join(self.tmp_dir, f))
+ ]
+ for file in record_files:
+ with open(os.path.join(self.tmp_dir, file), 'r') as f:
+ content.append(f.read())
+ return sorted(content)
+
+ def cleanup(self):
+ shutil.rmtree(self.tmp_dir)
+
+
class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
"""A RestrictionProvider that used for sdf related tests."""
def initial_restriction(self, element):
@@ -1786,13 +1817,23 @@ class UnboundedOffsetRestrictionTracker(
class OffsetRangeProvider(beam.transforms.core.RestrictionProvider):
- def __init__(self, use_bounded_offset_range):
+ def __init__(self, use_bounded_offset_range, checkpoint_only=False):
self.use_bounded_offset_range = use_bounded_offset_range
+ self.checkpoint_only = checkpoint_only
def initial_restriction(self, element):
return restriction_trackers.OffsetRange(0, element)
def create_tracker(self, restriction):
+ if self.checkpoint_only:
+
+ class CheckpointOnlyOffsetRestrictionTracker(
+ restriction_trackers.OffsetRestrictionTracker):
+ def try_split(self, unused_fraction_of_remainder):
+ return super(CheckpointOnlyOffsetRestrictionTracker,
+ self).try_split(0.0)
+
+ return CheckpointOnlyOffsetRestrictionTracker(restriction)
if self.use_bounded_offset_range:
return restriction_trackers.OffsetRestrictionTracker(restriction)
return UnboundedOffsetRestrictionTracker(restriction)