You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:04 UTC

[flink] 08/16: [FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20713de1d06f2e03e8f15a4c209e7e8ffb9fbaa3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 16:25:47 2020 +0200

    [FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService
---
 .../api/runtime/NeverFireProcessingTimeService.java  |  6 ++++++
 .../runtime/tasks/ProcessingTimeService.java         | 17 +++++++++++++++++
 .../runtime/tasks/ProcessingTimeServiceImpl.java     | 10 ++++++++++
 .../runtime/tasks/SystemProcessingTimeService.java   | 20 ++++++++++++++------
 .../runtime/tasks/TestProcessingTimeService.java     |  6 ++++++
 5 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
index 4e16fcc..5b32bd0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
@@ -52,6 +52,12 @@ public final class NeverFireProcessingTimeService implements TimerService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(
+		ProcessingTimeCallback callback, long initialDelay, long period) {
+		return FUTURE;
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return shutdown.get();
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 3f37ff5..9ce97f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -48,6 +49,9 @@ public interface ProcessingTimeService {
 	/**
 	 * Registers a task to be executed repeatedly at a fixed rate.
 	 *
+	 * <p>This call behaves similar to
+	 * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+	 *
 	 * @param callback to be executed after the initial delay and then after each period
 	 * @param initialDelay initial delay to start executing callback
 	 * @param period after the initial delay after which the callback is executed
@@ -56,6 +60,19 @@ public interface ProcessingTimeService {
 	ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);
 
 	/**
+	 * Registers a task to be executed repeatedly with a fixed delay.
+	 *
+	 * <p>This call behaves similar to
+	 * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+	 *
+	 * @param callback to be executed after the initial delay and then after each period
+	 * @param initialDelay initial delay to start executing callback
+	 * @param period after the initial delay after which the callback is executed
+	 * @return Scheduled future representing the task to be executed repeatedly
+	 */
+	ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period);
+
+	/**
 	 * This method puts the service into a state where it does not register new timers, but
 	 * returns for each call to {@link #registerTimer} or {@link #scheduleAtFixedRate} a "mock"
 	 * future and the "mock" future will be never completed. Furthermore, the timers registered
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
index 61ae9f2..456d0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
@@ -77,6 +77,16 @@ class ProcessingTimeServiceImpl implements ProcessingTimeService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		if (isQuiesced()) {
+			return new NeverCompleteFuture(initialDelay);
+		}
+
+		return timerService.scheduleWithFixedDelay(
+			addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)), initialDelay, period);
+	}
+
+	@Override
 	public CompletableFuture<Void> quiesce() {
 		if (!quiesced) {
 			quiesced = true;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index c0ecd85..fc53870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -126,16 +126,24 @@ public class SystemProcessingTimeService implements TimerService {
 
 	@Override
 	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
-		long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+		return scheduleRepeatedly(callback, initialDelay, period, false);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		return scheduleRepeatedly(callback, initialDelay, period, true);
+	}
+
+	private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
+		final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+		final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);
 
 		// we directly try to register the timer and only react to the status on exception
 		// that way we save unnecessary volatile accesses for each timer
 		try {
-			return timerService.scheduleAtFixedRate(
-				wrapOnTimerCallback(callback, nextTimestamp, period),
-				initialDelay,
-				period,
-				TimeUnit.MILLISECONDS);
+			return fixedDelay
+					? timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS)
+					: timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
 		} catch (RejectedExecutionException e) {
 			final int status = this.status.get();
 			if (status == STATUS_QUIESCED) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 67a0ef7..7c5742c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -113,6 +113,12 @@ public class TestProcessingTimeService implements TimerService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		// for all testing purposed, there is no difference between the fixed rate and fixed delay
+		return scheduleAtFixedRate(callback, initialDelay, period);
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return isTerminated;
 	}