You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/01/07 06:05:40 UTC

[flink] branch release-1.14 updated: [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new c50151b  [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value
c50151b is described below

commit c50151b5523705a58e4c38ee71f351b6f6f06699
Author: Dian Fu <di...@apache.org>
AuthorDate: Fri Jan 7 10:56:26 2022 +0800

    [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value
    
    This closes #18292.
---
 .../pyflink/fn_execution/beam/beam_stream_fast.pyx |  2 +-
 .../pyflink/fn_execution/beam/beam_stream_slow.py  |  2 +-
 .../pyflink/fn_execution/utils/operation_utils.py  | 35 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
index 583a0c2..d7ed666 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
@@ -23,7 +23,7 @@
 from libc.stdlib cimport realloc
 from libc.string cimport memcpy
 
-from apache_beam.runners.worker.data_plane import PeriodicThread
+from pyflink.fn_execution.utils.operation_utils import PeriodicThread
 
 cdef class BeamInputStream(LengthPrefixInputStream):
     def __cinit__(self, input_stream, size):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
index 376a861..2210865 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
@@ -16,9 +16,9 @@
 # limitations under the License.
 ################################################################################
 from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream
-from apache_beam.runners.worker.data_plane import PeriodicThread
 
 from pyflink.fn_execution.stream_slow import InputStream
+from pyflink.fn_execution.utils.operation_utils import PeriodicThread
 
 
 class BeamInputStream(InputStream):
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index f779cfd..9ed177d 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,6 +16,8 @@
 # limitations under the License.
 ################################################################################
 import datetime
+import threading
+import time
 from collections.abc import Generator
 
 from functools import partial
@@ -271,3 +273,36 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ) -> None:
+        threading.Thread.__init__(self)
+        self._interval = interval
+        self._function = function
+        self._args = args if args is not None else []
+        self._kwargs = kwargs if kwargs is not None else {}
+        self._finished = threading.Event()
+
+    def run(self) -> None:
+        now = time.time()
+        next_call = now + self._interval
+        while (next_call <= now and not self._finished.is_set()) or \
+                (not self._finished.wait(next_call - now)):
+            if next_call <= now:
+                next_call = now + self._interval
+            else:
+                next_call = next_call + self._interval
+            self._function(*self._args, **self._kwargs)
+            now = time.time()
+
+    def cancel(self) -> None:
+        """Stop the thread if it hasn't finished yet."""
+        self._finished.set()