You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/01/07 06:05:40 UTC
[flink] branch release-1.14 updated: [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new c50151b [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value
c50151b is described below
commit c50151b5523705a58e4c38ee71f351b6f6f06699
Author: Dian Fu <di...@apache.org>
AuthorDate: Fri Jan 7 10:56:26 2022 +0800
[FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value
This closes #18292.
---
.../pyflink/fn_execution/beam/beam_stream_fast.pyx | 2 +-
.../pyflink/fn_execution/beam/beam_stream_slow.py | 2 +-
.../pyflink/fn_execution/utils/operation_utils.py | 35 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
index 583a0c2..d7ed666 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
@@ -23,7 +23,7 @@
from libc.stdlib cimport realloc
from libc.string cimport memcpy
-from apache_beam.runners.worker.data_plane import PeriodicThread
+from pyflink.fn_execution.utils.operation_utils import PeriodicThread
cdef class BeamInputStream(LengthPrefixInputStream):
def __cinit__(self, input_stream, size):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
index 376a861..2210865 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
@@ -16,9 +16,9 @@
# limitations under the License.
################################################################################
from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream
-from apache_beam.runners.worker.data_plane import PeriodicThread
from pyflink.fn_execution.stream_slow import InputStream
+from pyflink.fn_execution.utils.operation_utils import PeriodicThread
class BeamInputStream(InputStream):
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index f779cfd..9ed177d 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,6 +16,8 @@
# limitations under the License.
################################################################################
import datetime
+import threading
+import time
from collections.abc import Generator
from functools import partial
@@ -271,3 +273,36 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ) -> None:
+ threading.Thread.__init__(self)
+ self._interval = interval
+ self._function = function
+ self._args = args if args is not None else []
+ self._kwargs = kwargs if kwargs is not None else {}
+ self._finished = threading.Event()
+
+ def run(self) -> None:
+ now = time.time()
+ next_call = now + self._interval
+ while (next_call <= now and not self._finished.is_set()) or \
+ (not self._finished.wait(next_call - now)):
+ if next_call <= now:
+ next_call = now + self._interval
+ else:
+ next_call = next_call + self._interval
+ self._function(*self._args, **self._kwargs)
+ now = time.time()
+
+ def cancel(self) -> None:
+ """Stop the thread if it hasn't finished yet."""
+ self._finished.set()