You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/06/19 21:59:03 UTC
[beam] branch master updated: Enable test_pardo_timers_clear for
fn_runner
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a8ec8aa Enable test_pardo_timers_clear for fn_runner
new 36183a8 Merge pull request #11894 from boyuanzz/BEAM-7074
a8ec8aa is described below
commit a8ec8aac7e67ba0bc75baa40e7aa4520808d2f9d
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Jun 2 10:32:57 2020 -0700
Enable test_pardo_timers_clear for fn_runner
---
.../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 4 +++-
.../apache_beam/runners/portability/fn_api_runner/fn_runner_test.py | 5 -----
2 files changed, 3 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index c2147ce..d3c3143 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -383,7 +383,9 @@ class FnApiRunner(runner.PipelineRunner):
decoded_timer.windows[0]] = decoded_timer
out = create_OutputStream()
for decoded_timer in timers_by_key_and_window.values():
- timer_coder_impl.encode_to_stream(decoded_timer, out, True)
+ # Only add not cleared timer to fired timers.
+ if not decoded_timer.clear_bit:
+ timer_coder_impl.encode_to_stream(decoded_timer, out, True)
fired_timers[(transform_id, timer_family_id)] = ListBuffer(
coder_impl=timer_coder_impl)
fired_timers[(transform_id, timer_family_id)].append(out.get())
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index a49ce03..8548f90 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -377,11 +377,6 @@ class FnApiRunnerTest(unittest.TestCase):
assert_that(actual, equal_to(expected))
def test_pardo_timers_clear(self):
- if type(self).__name__ != 'FlinkRunnerTest':
- # FnApiRunner fails to wire multiple timer collections
- # this method can replace test_pardo_timers when the issue is fixed
- self.skipTest('BEAM-7074: Multiple timer definitions not supported.')
-
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
clear_timer_spec = userstate.TimerSpec(
'clear_timer', userstate.TimeDomain.WATERMARK)