You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:04 UTC
[flink] 08/16: [FLINK-17904][runtime] Add scheduleWithFixedDelay to
ProcessingTimeService
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20713de1d06f2e03e8f15a4c209e7e8ffb9fbaa3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 16:25:47 2020 +0200
[FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService
---
.../api/runtime/NeverFireProcessingTimeService.java | 6 ++++++
.../runtime/tasks/ProcessingTimeService.java | 17 +++++++++++++++++
.../runtime/tasks/ProcessingTimeServiceImpl.java | 10 ++++++++++
.../runtime/tasks/SystemProcessingTimeService.java | 20 ++++++++++++++------
.../runtime/tasks/TestProcessingTimeService.java | 6 ++++++
5 files changed, 53 insertions(+), 6 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
index 4e16fcc..5b32bd0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
@@ -52,6 +52,12 @@ public final class NeverFireProcessingTimeService implements TimerService {
}
@Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ ProcessingTimeCallback callback, long initialDelay, long period) {
+ return FUTURE;
+ }
+
+ @Override
public boolean isTerminated() {
return shutdown.get();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 3f37ff5..9ce97f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* Defines the current processing time and handles all related actions,
@@ -48,6 +49,9 @@ public interface ProcessingTimeService {
/**
* Registers a task to be executed repeatedly at a fixed rate.
*
+ * <p>This call behaves similar to
+ * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+ *
* @param callback to be executed after the initial delay and then after each period
* @param initialDelay initial delay to start executing callback
* @param period after the initial delay after which the callback is executed
@@ -56,6 +60,19 @@ public interface ProcessingTimeService {
ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);
/**
+ * Registers a task to be executed repeatedly with a fixed delay.
+ *
+ * <p>This call behaves similar to
+ * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ *
+ * @param callback to be executed after the initial delay and then after each period
+ * @param initialDelay initial delay to start executing callback
+ * @param period after the initial delay after which the callback is executed
+ * @return Scheduled future representing the task to be executed repeatedly
+ */
+ ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period);
+
+ /**
* This method puts the service into a state where it does not register new timers, but
* returns for each call to {@link #registerTimer} or {@link #scheduleAtFixedRate} a "mock"
* future and the "mock" future will be never completed. Furthermore, the timers registered
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
index 61ae9f2..456d0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
@@ -77,6 +77,16 @@ class ProcessingTimeServiceImpl implements ProcessingTimeService {
}
@Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+ if (isQuiesced()) {
+ return new NeverCompleteFuture(initialDelay);
+ }
+
+ return timerService.scheduleWithFixedDelay(
+ addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)), initialDelay, period);
+ }
+
+ @Override
public CompletableFuture<Void> quiesce() {
if (!quiesced) {
quiesced = true;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index c0ecd85..fc53870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -126,16 +126,24 @@ public class SystemProcessingTimeService implements TimerService {
@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
- long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+ return scheduleRepeatedly(callback, initialDelay, period, false);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+ return scheduleRepeatedly(callback, initialDelay, period, true);
+ }
+
+ private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
+ final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+ final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
- return timerService.scheduleAtFixedRate(
- wrapOnTimerCallback(callback, nextTimestamp, period),
- initialDelay,
- period,
- TimeUnit.MILLISECONDS);
+ return fixedDelay
+ ? timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS)
+ : timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 67a0ef7..7c5742c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -113,6 +113,12 @@ public class TestProcessingTimeService implements TimerService {
}
@Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+ // for all testing purposed, there is no difference between the fixed rate and fixed delay
+ return scheduleAtFixedRate(callback, initialDelay, period);
+ }
+
+ @Override
public boolean isTerminated() {
return isTerminated;
}