You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/06 06:04:39 UTC
[flink] branch release-1.13 updated: [FLINK-22573][datastream] Fix
AsyncIO calls timeout on completed element.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new b146cb8 [FLINK-22573][datastream] Fix AsyncIO calls timeout on completed element.
b146cb8 is described below
commit b146cb8928026b2a4c7aae9fce492951fd6d18a7
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed May 5 13:29:28 2021 +0200
[FLINK-22573][datastream] Fix AsyncIO calls timeout on completed element.
As long as the mailbox is blocked, timers are not cancelled, such that a completed element might still get a timeout.
The fix is to check the completed flag when the timer triggers.
---
.../api/operators/async/AsyncWaitOperator.java | 32 +++++-----
.../api/operators/async/AsyncWaitOperatorTest.java | 69 ++++++++++++++++++++++
.../tasks/StreamTaskMailboxTestHarness.java | 4 ++
3 files changed, 89 insertions(+), 16 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index e18a225..a65b71b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -200,18 +200,7 @@ public class AsyncWaitOperator<IN, OUT>
// register a timeout for the entry if timeout is configured
if (timeout > 0L) {
- final long timeoutTimestamp =
- timeout + getProcessingTimeService().getCurrentProcessingTime();
-
- final ScheduledFuture<?> timeoutTimer =
- getProcessingTimeService()
- .registerTimer(
- timeoutTimestamp,
- timestamp ->
- userFunction.timeout(
- element.getValue(), resultHandler));
-
- resultHandler.setTimeoutTimer(timeoutTimer);
+ resultHandler.registerTimeout(getProcessingTimeService(), timeout);
}
userFunction.asyncInvoke(element.getValue(), resultHandler);
@@ -341,10 +330,6 @@ public class AsyncWaitOperator<IN, OUT>
this.resultFuture = resultFuture;
}
- void setTimeoutTimer(ScheduledFuture<?> timeoutTimer) {
- this.timeoutTimer = timeoutTimer;
- }
-
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(
@@ -405,5 +390,20 @@ public class AsyncWaitOperator<IN, OUT>
// blocking section in #addToWorkQueue or #waitInFlightInputsFinished)
processInMailbox(Collections.emptyList());
}
+
+ public void registerTimeout(ProcessingTimeService processingTimeService, long timeout) {
+ final long timeoutTimestamp =
+ timeout + processingTimeService.getCurrentProcessingTime();
+
+ timeoutTimer =
+ processingTimeService.registerTimer(
+ timeoutTimestamp, timestamp -> timerTriggered());
+ }
+
+ private void timerTriggered() throws Exception {
+ if (!completed.get()) {
+ userFunction.timeout(inputRecord.getValue(), this);
+ }
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 986989f..ee04bc01 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -56,6 +56,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
@@ -73,6 +75,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -82,11 +85,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -241,6 +248,29 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
}
+ /** Completes input at half the TIMEOUT and registers timeouts. */
+ private static class TimeoutAfterCompletionTestFunction
+ implements AsyncFunction<Integer, Integer> {
+ static final AtomicBoolean TIMED_OUT = new AtomicBoolean(false);
+ static final CountDownLatch COMPLETION_TRIGGER = new CountDownLatch(1);
+
+ @Override
+ public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
+ ForkJoinPool.commonPool()
+ .submit(
+ () -> {
+ COMPLETION_TRIGGER.await();
+ resultFuture.complete(Collections.singletonList(input));
+ return null;
+ });
+ }
+
+ @Override
+ public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
+ TIMED_OUT.set(true);
+ }
+ }
+
/** A {@link Comparator} to compare {@link StreamRecord} while sorting them. */
private class StreamRecordComparator implements Comparator<Object> {
@Override
@@ -784,6 +814,45 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
/**
+ * Checks if timeout has been called after the element has been completed within the timeout.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-22573">FLINK-22573</a>
+ */
+ @Test
+ public void testTimeoutAfterComplete() throws Exception {
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ try (StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new AsyncWaitOperatorFactory<>(
+ new TimeoutAfterCompletionTestFunction(),
+ TIMEOUT,
+ 1,
+ AsyncDataStream.OutputMode.UNORDERED))
+ .build()) {
+ harness.processElement(new StreamRecord<>(1));
+ // add a timer after AsyncIO added its timer to verify that AsyncIO timer is processed
+ ScheduledFuture<?> testTimer =
+ harness.getTimerService()
+ .registerTimer(
+ harness.getTimerService().getCurrentProcessingTime() + TIMEOUT,
+ ts -> {});
+ // trigger the regular completion in AsyncIO
+ TimeoutAfterCompletionTestFunction.COMPLETION_TRIGGER.countDown();
+ // wait until all timers have been processed
+ testTimer.get();
+ // handle normal completion call outputting the element in mailbox thread
+ harness.processAll();
+ assertEquals(
+ Collections.singleton(new StreamRecord<>(1)),
+ new HashSet<>(harness.getOutput()));
+ assertFalse("no timeout expected", TimeoutAfterCompletionTestFunction.TIMED_OUT.get());
+ }
+ }
+
+ /**
* FLINK-6435
*
* <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index dcdc2bb..1f9632d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -68,6 +68,10 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable {
return streamTask;
}
+ public TimerService getTimerService() {
+ return streamTask.getTimerService();
+ }
+
/**
* Get all the output from the task. This contains StreamRecords and Events interleaved. Use
* {@link