You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/10 12:08:27 UTC

[GitHub] [flink] sunhaibotb commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time timers when closing operators to make endInput semantics on the operator chain strict

sunhaibotb commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time timers when closing operators to make endInput semantics on the operator chain strict
URL: https://github.com/apache/flink/pull/10151#discussion_r356000446
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
 ##########
 @@ -18,18 +18,57 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
 class ProcessingTimeServiceImpl implements ProcessingTimeService {
+
+	private static final int STATUS_ALIVE = 0;
+	private static final int STATUS_QUIESCED = 1;
+
+	// ------------------------------------------------------------------------
+
 	private final TimerService timerService;
+
 	private final Function<ProcessingTimeCallback, ProcessingTimeCallback> processingTimeCallbackWrapper;
 
+	private final ConcurrentHashMap<TimerScheduledFuture<?>, Object> undoneTimers;
 
 Review comment:
   After closing the operator, we need to actively cancel the timers that are not in executing (please look at `ProcessingTimeServiceImpl#cancelTimersGracefullyAfterQuiesce`), so we need to store these undone timers. If we don't, we need to wait until the timer is fired, which may take a long time (we don't know how long it will be). For example, the timer registered in `StreamingFileSink` is fired per 60s.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services