You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/06/19 21:59:03 UTC

[beam] branch master updated: Enable test_pardo_timers_clear for fn_runner

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a8ec8aa  Enable test_pardo_timers_clear for fn_runner
     new 36183a8  Merge pull request #11894 from boyuanzz/BEAM-7074
a8ec8aa is described below

commit a8ec8aac7e67ba0bc75baa40e7aa4520808d2f9d
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Jun 2 10:32:57 2020 -0700

    Enable test_pardo_timers_clear for fn_runner
---
 .../apache_beam/runners/portability/fn_api_runner/fn_runner.py       | 4 +++-
 .../apache_beam/runners/portability/fn_api_runner/fn_runner_test.py  | 5 -----
 2 files changed, 3 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index c2147ce..d3c3143 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -383,7 +383,9 @@ class FnApiRunner(runner.PipelineRunner):
                                      decoded_timer.windows[0]] = decoded_timer
         out = create_OutputStream()
         for decoded_timer in timers_by_key_and_window.values():
-          timer_coder_impl.encode_to_stream(decoded_timer, out, True)
+          # Only add not cleared timer to fired timers.
+          if not decoded_timer.clear_bit:
+            timer_coder_impl.encode_to_stream(decoded_timer, out, True)
         fired_timers[(transform_id, timer_family_id)] = ListBuffer(
             coder_impl=timer_coder_impl)
         fired_timers[(transform_id, timer_family_id)].append(out.get())
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index a49ce03..8548f90 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -377,11 +377,6 @@ class FnApiRunnerTest(unittest.TestCase):
       assert_that(actual, equal_to(expected))
 
   def test_pardo_timers_clear(self):
-    if type(self).__name__ != 'FlinkRunnerTest':
-      # FnApiRunner fails to wire multiple timer collections
-      # this method can replace test_pardo_timers when the issue is fixed
-      self.skipTest('BEAM-7074: Multiple timer definitions not supported.')
-
     timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
     clear_timer_spec = userstate.TimerSpec(
         'clear_timer', userstate.TimeDomain.WATERMARK)