You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/06 06:04:39 UTC

[flink] branch release-1.13 updated: [FLINK-22573][datastream] Fix AsyncIO calls timeout on completed element.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new b146cb8  [FLINK-22573][datastream] Fix AsyncIO calls timeout on completed element.
b146cb8 is described below

commit b146cb8928026b2a4c7aae9fce492951fd6d18a7
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed May 5 13:29:28 2021 +0200

    [FLINK-22573][datastream] Fix AsyncIO calls timeout on completed element.
    
    As long as the mailbox is blocked, timers are not cancelled, such that a completed element might still get a timeout.
    The fix is to check the completed flag when the timer triggers.
---
 .../api/operators/async/AsyncWaitOperator.java     | 32 +++++-----
 .../api/operators/async/AsyncWaitOperatorTest.java | 69 ++++++++++++++++++++++
 .../tasks/StreamTaskMailboxTestHarness.java        |  4 ++
 3 files changed, 89 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index e18a225..a65b71b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -200,18 +200,7 @@ public class AsyncWaitOperator<IN, OUT>
 
         // register a timeout for the entry if timeout is configured
         if (timeout > 0L) {
-            final long timeoutTimestamp =
-                    timeout + getProcessingTimeService().getCurrentProcessingTime();
-
-            final ScheduledFuture<?> timeoutTimer =
-                    getProcessingTimeService()
-                            .registerTimer(
-                                    timeoutTimestamp,
-                                    timestamp ->
-                                            userFunction.timeout(
-                                                    element.getValue(), resultHandler));
-
-            resultHandler.setTimeoutTimer(timeoutTimer);
+            resultHandler.registerTimeout(getProcessingTimeService(), timeout);
         }
 
         userFunction.asyncInvoke(element.getValue(), resultHandler);
@@ -341,10 +330,6 @@ public class AsyncWaitOperator<IN, OUT>
             this.resultFuture = resultFuture;
         }
 
-        void setTimeoutTimer(ScheduledFuture<?> timeoutTimer) {
-            this.timeoutTimer = timeoutTimer;
-        }
-
         @Override
         public void complete(Collection<OUT> results) {
             Preconditions.checkNotNull(
@@ -405,5 +390,20 @@ public class AsyncWaitOperator<IN, OUT>
             // blocking section in #addToWorkQueue or #waitInFlightInputsFinished)
             processInMailbox(Collections.emptyList());
         }
+
+        public void registerTimeout(ProcessingTimeService processingTimeService, long timeout) {
+            final long timeoutTimestamp =
+                    timeout + processingTimeService.getCurrentProcessingTime();
+
+            timeoutTimer =
+                    processingTimeService.registerTimer(
+                            timeoutTimestamp, timestamp -> timerTriggered());
+        }
+
+        private void timerTriggered() throws Exception {
+            if (!completed.get()) {
+                userFunction.timeout(inputRecord.getValue(), this);
+            }
+        }
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 986989f..ee04bc01 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -56,6 +56,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.ExceptionUtils;
@@ -73,6 +75,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
@@ -82,11 +85,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -241,6 +248,29 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
     }
 
+    /** Completes input at half the TIMEOUT and registers timeouts. */
+    private static class TimeoutAfterCompletionTestFunction
+            implements AsyncFunction<Integer, Integer> {
+        static final AtomicBoolean TIMED_OUT = new AtomicBoolean(false);
+        static final CountDownLatch COMPLETION_TRIGGER = new CountDownLatch(1);
+
+        @Override
+        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
+            ForkJoinPool.commonPool()
+                    .submit(
+                            () -> {
+                                COMPLETION_TRIGGER.await();
+                                resultFuture.complete(Collections.singletonList(input));
+                                return null;
+                            });
+        }
+
+        @Override
+        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
+            TIMED_OUT.set(true);
+        }
+    }
+
     /** A {@link Comparator} to compare {@link StreamRecord} while sorting them. */
     private class StreamRecordComparator implements Comparator<Object> {
         @Override
@@ -784,6 +814,45 @@ public class AsyncWaitOperatorTest extends TestLogger {
     }
 
     /**
+     * Checks if timeout has been called after the element has been completed within the timeout.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/FLINK-22573">FLINK-22573</a>
+     */
+    @Test
+    public void testTimeoutAfterComplete() throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try (StreamTaskMailboxTestHarness<Integer> harness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        new TimeoutAfterCompletionTestFunction(),
+                                        TIMEOUT,
+                                        1,
+                                        AsyncDataStream.OutputMode.UNORDERED))
+                        .build()) {
+            harness.processElement(new StreamRecord<>(1));
+            // add a timer after AsyncIO added its timer to verify that AsyncIO timer is processed
+            ScheduledFuture<?> testTimer =
+                    harness.getTimerService()
+                            .registerTimer(
+                                    harness.getTimerService().getCurrentProcessingTime() + TIMEOUT,
+                                    ts -> {});
+            // trigger the regular completion in AsyncIO
+            TimeoutAfterCompletionTestFunction.COMPLETION_TRIGGER.countDown();
+            // wait until all timers have been processed
+            testTimer.get();
+            // handle normal completion call outputting the element in mailbox thread
+            harness.processAll();
+            assertEquals(
+                    Collections.singleton(new StreamRecord<>(1)),
+                    new HashSet<>(harness.getOutput()));
+            assertFalse("no timeout expected", TimeoutAfterCompletionTestFunction.TIMED_OUT.get());
+        }
+    }
+
+    /**
      * FLINK-6435
      *
      * <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index dcdc2bb..1f9632d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -68,6 +68,10 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable {
         return streamTask;
     }
 
+    public TimerService getTimerService() {
+        return streamTask.getTimerService();
+    }
+
     /**
      * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
      * {@link