You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "fredia (via GitHub)" <gi...@apache.org> on 2024/04/15 09:35:12 UTC

[PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

fredia opened a new pull request, #24667:
URL: https://github.com/apache/flink/pull/24667

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   As part of the async execution model of disaggregated state management, this PR introduce async execution configurations.
   
   
   ## Brief change log
   
   - Add async execution configurations in `ExecutionOptions`
   - Add related getter/setter in `ExecutionConfig`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - StreamExecutionEnvironmentTest#testAsyncExecutionConfiguration
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs/ JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,19 +103,26 @@ public class AsyncExecutionController<K> {
     final AtomicInteger inFlightRecordNum;
 
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"
+                                    + " Note: This is an experimental feature under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Got it. I agree with it that `async-mode` is better than `async-state` here, but a single word "buffer" might still not be enough, as a new developer who is not familiar with FLIP-425 might regard this configuration as some buffer related to stream records, and then get confused with this config and `in-flight-records-limit`. A possibly better name that comes to my mind is `execution.async-mode.state-buffer-timeout`. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
+    // --------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // --------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   Seems that this comment is not resolved yet?



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,52 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    @Internal
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Internal
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");

Review Comment:
   "execution.async-state.buffer-timeout" should be changed to "execution.async-mode.buffer-timeout".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566742123


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"

Review Comment:
   It might be better to remove this configuration for now and add it in future when such use cases are found.
   
   - Whether to enable async state access can be inferred automatically by the Flink infrastructure, depending on the location of state backends and the state API used in operators.
   - It is better not to expose implementation details, like the sync/async modes mentioned here, to end users. So long as the order to same-key records and the order of async state callbacks are guaranteed, that is enough for users.
       - Especially, the implementation in the sync mode might be altered in future, in order to improve performance in situations where sync state API + remote state backend is used. In this case the statement "the state access is always executed synchronously" might cause deprecation issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878


##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
+    // --------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // --------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   It might be better to mark them as `@Experimental` instead of `@Internal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2078637067

   @Zakelly Thanks for the review, I addressed the comments and rebased to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575597004


##########
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##########
@@ -346,6 +337,63 @@ public void testSyncPoint() {
         recordContext2.release();
     }
 
+    @Test
+    void testBufferTimeout() throws InterruptedException {
+        batchSize = 5;
+        timeout = 1000;
+        setup();
+        Runnable userCode = () -> valueState.asyncValue();
+
+        // ------------ basic timeout -------------------
+        for (int i = 0; i < batchSize - 1; i++) {
+            String record = String.format("key%d-r%d", i, i);
+            String key = String.format("key%d", batchSize + i);
+            RecordContext<String> recordContext = aec.buildContext(record, key);
+            aec.setCurrentContext(recordContext);
+            userCode.run();
+        }
+        assertThat(aec.timeoutFlag.get()).isFalse();
+        assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+        Thread.sleep(timeout + 100);

Review Comment:
   It might be better to avoid using `Thread.sleep` in test cases, as it may increase the duration of CI and behave as flaky tests. How about introduce a `TestScheduledThreadPoolExecutor` so we can control when each step is triggered? An example of this is `org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577695901


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   No, because the seq number will increase by 1 per trigger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1579142292


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,26 +97,39 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
-
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
-        this.stateRequestsBuffer = new StateRequestBuffer<>();
+
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.stateRequestsBuffer =
+                new StateRequestBuffer<>(
+                        bufferTimeout,
+                        () ->
+                                mailboxExecutor.execute(
+                                        () -> {
+                                            if (stateRequestsBuffer.currentTriggerSeq.get()
+                                                    == stateRequestsBuffer.scheduledTriggerSeq

Review Comment:
   This condition is always true.  I suggest you pass the `triggered seq` into the handler lambda as parameter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +72,75 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when {@link #activeQueue} size is 1. */
+    final Runnable timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
+
+    public StateRequestBuffer(long bufferTimeout, Runnable timeoutHandler) {
         this.activeQueue = new LinkedList<>();
         this.blockingQueue = new HashMap<>();
         this.blockingQueueSize = 0;
+        this.bufferTimeout = bufferTimeout;
+        this.timeoutHandler = timeoutHandler;
+        this.scheduledTriggerSeq = new AtomicLong(-1);
+        this.currentTriggerSeq = new AtomicLong(0);
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor = DELAYER;
+        }
+    }
+
+    void advanceTriggerSeq() {
+        currentTriggerSeq.incrementAndGet();
     }
 
     void enqueueToActive(StateRequest<K, ?, ?> request) {
         if (request.getRequestType() == StateRequestType.SYNC_POINT) {
             request.getFuture().complete(null);
         } else {
             activeQueue.add(request);
+            // if the active queue size is 1, it means that the current request is the oldest one in

Review Comment:
   nit. remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578767927


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +106,80 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        if (triggerSeq != currentTriggerSeq.get()) {
+                                            // if any new trigger occurs, skip this schedule
+                                            return;
+                                        }
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(true), "AEC-timeout");

Review Comment:
   Well, I mean we keep the `if (triggerSeq != currentTriggerSeq.get()) { return; }` before this, only change the mailbox processing part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578769079


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   Ah... I've mispoken...  Is it possible `scheduleTimeout` multiple times for one seq?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1571771428


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"
+                                    + " Note: This is an experimental feature under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Sry guys, I'm afraid I prefer the original proposal of `async-state` in FLIP, since this is only for the stateful operator and even the problem of `record-order` is caused by state async processing...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574559188


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    return new Thread(r, "AEC-scheduler");
+                                }
+                            });
+            this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+            // make sure shutdown removes all pending tasks
+            this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
                 batchSize,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout() {
+        if (bufferTimeOut > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        timeoutFlag.set(true);
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(false), "AEC-timeout");

Review Comment:
   I'd suggest a `sequence number` for each buffer to make sure the timeout trigger the intended one.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {

Review Comment:
   `new ExecutorThreadFactory("AEC-timeout-scheduler")` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574293924


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -50,15 +50,24 @@ public class AsyncExecutionController<K> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
 
-    public static final int DEFAULT_BATCH_SIZE = 1000;
-    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+    private static final int DEFAULT_BATCH_SIZE = 1000;
+
+    private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
+    private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
 
     /**
      * The batch size. When the number of state requests in the active buffer exceeds the batch
      * size, a batched state execution would be triggered.
      */
     private final int batchSize;
 
+    /**
+     * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the
+     * activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a
+     * trigger will perform actively.
+     */
+    private final int bufferTimeOut;

Review Comment:
   I added an implementation in second commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575787928


##########
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##########
@@ -346,6 +337,63 @@ public void testSyncPoint() {
         recordContext2.release();
     }
 
+    @Test
+    void testBufferTimeout() throws InterruptedException {
+        batchSize = 5;
+        timeout = 1000;
+        setup();
+        Runnable userCode = () -> valueState.asyncValue();
+
+        // ------------ basic timeout -------------------
+        for (int i = 0; i < batchSize - 1; i++) {
+            String record = String.format("key%d-r%d", i, i);
+            String key = String.format("key%d", batchSize + i);
+            RecordContext<String> recordContext = aec.buildContext(record, key);
+            aec.setCurrentContext(recordContext);
+            userCode.run();
+        }
+        assertThat(aec.timeoutFlag.get()).isFalse();
+        assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+        Thread.sleep(timeout + 100);

Review Comment:
   Thanks for the suggestion, removed  `Thread.sleep` from `testBufferTimeout()` and `testBufferTimeoutSkip()`, used `ManuallyTriggeredScheduledExecutorService` as `ScheduledExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2066106851

   After offline discussion, FLIP-425 is only used for stateful operators, hence ,`async-mode` was changed back to `async-state`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"
+                                    + " Note: This is an experimental feature under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Good pointπŸ‘, changed it to `execution.async-mode.state-buffer-timeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1572344539


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,62 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    // ------------------------- Async State Execution --------------------------
+
+    /**
+     * The max limit of in-flight records number in async state execution, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-state.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async state execution, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async state execution. Async state execution provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to
+     * control the frequency of triggering.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-state.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async state execution. Async state execution provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform
+     * actively.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_TIMEOUT =

Review Comment:
   It might be better to make time configurations as `long` instead of as `int`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -50,15 +50,24 @@ public class AsyncExecutionController<K> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
 
-    public static final int DEFAULT_BATCH_SIZE = 1000;
-    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+    private static final int DEFAULT_BATCH_SIZE = 1000;
+
+    private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
+    private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
 
     /**
      * The batch size. When the number of state requests in the active buffer exceeds the batch
      * size, a batched state execution would be triggered.
      */
     private final int batchSize;
 
+    /**
+     * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the
+     * activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a
+     * trigger will perform actively.
+     */
+    private final int bufferTimeOut;

Review Comment:
   There is still a TODO  in AsyncExecutionController#triggerIfNeeded that says 
   ```
   // TODO: introduce a timeout mechanism for triggering.
   ```
   Given that the configuration bufferTimeout is introduced in this PR, shall we provide the implementation for this configuration as well?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,19 +103,26 @@ public class AsyncExecutionController<K> {
     final AtomicInteger inFlightRecordNum;
 
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   How about remove the constants and use `ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the ExecutionConfig? This can help avoid maintaining the same value in two places.



##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
+    // --------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // --------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   Hmm, seems that they are still `@Internal` rather than `@Experimental` in this version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878


##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
+    // --------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // --------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   It might be better to mark them as `@Experimental` instead of `@Internal`.
   
   - Internal: Annotation to mark methods within **stable**, public APIs as an internal developer API.
   - Experimental: Classes with this annotation are neither battle-tested **nor stable**, and may be changed or removed in future versions.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"

Review Comment:
   It might be better to remove this configuration for now and add it in future when such use cases are found.
   
   - Whether to enable async state access can be inferred automatically by the Flink infrastructure, depending on the location of state backends and the state API used in operators.
   - It is better not to expose implementation details, like the sync/async modes mentioned here, to end users. So long as the order to same-key records and the order of async state callbacks are guaranteed, that is enough for users.
       - Especially, the implementation in the sync mode might be altered in future, in order to improve performance in situations where sync state API + remote state backend is used. In this case the statement "the state access is always executed synchronously" should cause deprecation issues.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"
+                                    + " Note: This is an experimental feature under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   It might be better to change the `execution.async-mode` above to `execution.async-state`.



##########
docs/layouts/shortcodes/generated/execution_configuration.html:
##########
@@ -8,6 +8,31 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>execution.async-mode.buffer-size</h5></td>

Review Comment:
   Given that these configurations are only for internal experimental use, it might be better not to expose them in documents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2060999083

   @yunfengzhou-hub @Zakelly Thanks for the detailed review, I have rebased this PR and addressed some comments, PTAL if you are free.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia merged PR #24667:
URL: https://github.com/apache/flink/pull/24667


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578825952


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   I mean, the `triggerIfNeeded(false)` may be triggered somewhere else right? And even the `enqueueToActive` has two entry. I'd suggest move the `scheduleTimeout` part into the `StateRequestBuffer`, but the handler and seq maintaining part should stay in `AEC`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I added a `schedulingCount==0` condition to avoid "trigger multiple times for one seq" in the future.
   
   And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the suggestion.πŸ‘



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575604054


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;

Review Comment:
   The "O" in "BufferTimeOut" is upper-case while the "o" in timeoutFlag is lower-case. It might be better to get them unified to the same convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,19 +103,26 @@ public class AsyncExecutionController<K> {
     final AtomicInteger inFlightRecordNum;
 
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575735682


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    return new Thread(r, "AEC-scheduler");
+                                }
+                            });
+            this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+            // make sure shutdown removes all pending tasks
+            this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
                 batchSize,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout() {
+        if (bufferTimeOut > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        timeoutFlag.set(true);
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(false), "AEC-timeout");

Review Comment:
   Added, `testBufferTimeoutSkip` is introduced to test it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575609479


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Would it be better to reuse existing utility methods like `FutureUtils.delay()`? This way AEC won't need to maintain such resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1568659638


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"
+                                    + " Note: This is an experimental feature under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the newly records entering
+     * will be blocked until the in-flight records number drops below the limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode provides a buffer
+     * mechanism to reduce state access. When the number of state requests in the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch sizes will bring
+     * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Unify those options to `execution.async-mode`, cause we also provide `Record-order` mode to preserve the order of records that without state access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577589826


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +106,80 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        if (triggerSeq != currentTriggerSeq.get()) {
+                                            // if any new trigger occurs, skip this schedule
+                                            return;
+                                        }
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(true), "AEC-timeout");

Review Comment:
   how about
   ```
    mailboxExecutor.execute(() -> {if (triggerSeq == currentTriggerSeq.get()) {triggerIfNeeded(true);}, "AEC-timeout");
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   Is it possible trigger multiple times for one seq?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1568656329


##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related components when tasks initialize."
+                                    + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n"

Review Comment:
   Thanks for your insight, after discussing offline, we think it would be better to expose it on the data stream API in some way so that users can enable async execution in a more fine-grained manner.
   So, this option is removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575573086


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   πŸ‘ I added a `close()` method, 2 things include:
   1. drain all in-flight records
   2. shutdown `scheduledExecutor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1580464018


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,26 +97,38 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
-
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
-        this.stateRequestsBuffer = new StateRequestBuffer<>();
+
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.stateRequestsBuffer =
+                new StateRequestBuffer<>(
+                        bufferTimeout,
+                        (scheduledTriggerSeq) ->
+                                mailboxExecutor.execute(
+                                        () -> {
+                                            if (stateRequestsBuffer.currentTriggerSeq.get()

Review Comment:
   How about providing a function `boolean stateRequestsBuffer#checkCurrentSeq(long seq)` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +73,74 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when timeout. */
+    final Consumer<Long> timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by 1.
+     */
+    AtomicLong currentTriggerSeq;
+
+    public StateRequestBuffer(long bufferTimeout, Consumer<Long> timeoutHandler) {
         this.activeQueue = new LinkedList<>();
         this.blockingQueue = new HashMap<>();
         this.blockingQueueSize = 0;
+        this.bufferTimeout = bufferTimeout;
+        this.timeoutHandler = timeoutHandler;
+        this.scheduledTriggerSeq = new AtomicLong(-1);
+        this.currentTriggerSeq = new AtomicLong(0);
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor = DELAYER;
+        }
+    }
+
+    void advanceTriggerSeq() {
+        currentTriggerSeq.incrementAndGet();
     }
 
     void enqueueToActive(StateRequest<K, ?, ?> request) {
         if (request.getRequestType() == StateRequestType.SYNC_POINT) {
             request.getFuture().complete(null);
         } else {
             activeQueue.add(request);
+            if (bufferTimeout > 0 && currentTriggerSeq.get() > scheduledTriggerSeq.get()) {
+                if (currentScheduledFuture != null
+                        && !currentScheduledFuture.isDone()
+                        && !currentScheduledFuture.isCancelled()) {
+                    currentScheduledFuture.cancel(false);
+                }
+                scheduledTriggerSeq.set(currentTriggerSeq.get());

Review Comment:
   I was thinking add `final long thisScheduledSeq = scheduledTriggerSeq.get();` here, and use `thisScheduledSeq == currentTriggerSeq.get()` condition at line 136.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +73,74 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when timeout. */
+    final Consumer<Long> timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by 1.
+     */
+    AtomicLong currentTriggerSeq;

Review Comment:
   how about naming this `currentSeq`, since the batch is distinguished by a seq.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577700082


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +106,80 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        if (triggerSeq != currentTriggerSeq.get()) {
+                                            // if any new trigger occurs, skip this schedule
+                                            return;
+                                        }
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(true), "AEC-timeout");

Review Comment:
   This way may create an extra email and put it in the mailbox. I lean toward skipping it directly before submitting it to the mailbox. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578815645


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   No,  there will definitely be a trigger between two `stateRequestsBuffer.activeQueueSize() == 1`, so when the two `stateRequestsBuffer.activeQueueSize() == 1` conditions are met, seq number must be different. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2056402383

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "aea56668e33e7062e6b18dac4086a2f05dc36fc1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aea56668e33e7062e6b18dac4086a2f05dc36fc1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aea56668e33e7062e6b18dac4086a2f05dc36fc1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I added a `schedulingCount==1` condition to avoid "trigger multiple times for one seq" in the future.
   
   And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the suggestion.πŸ‘



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2058174500

   @Zakelly @ljz2051 would you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "fredia (via GitHub)" <gi...@apache.org>.
fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569863724


##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
     }
 
+    // --------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // --------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   Ah, I misread the comment, and changed them from `@Experimental` to `@Internal`, I have corrected them now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "masteryhx (via GitHub)" <gi...@apache.org>.
masteryhx commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574501889


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Should AEC become `Closeable` since we maintain some resources like this internally ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    return new Thread(r, "AEC-scheduler");
+                                }
+                            });
+            this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+            // make sure shutdown removes all pending tasks
+            this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",

Review Comment:
   nit: Also add `bufferTimeOut` here
   BTW, `bufferTimeOut` -> `bufferTimeout` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {

Review Comment:
   Seems it's not used and default value in this class could be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

Posted by "yunfengzhou-hub (via GitHub)" <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575603596


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    return new Thread(r, "AEC-scheduler");
+                                }
+                            });
+            this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+            // make sure shutdown removes all pending tasks
+            this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
                 batchSize,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout() {
+        if (bufferTimeOut > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        timeoutFlag.set(true);
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(false), "AEC-timeout");

Review Comment:
   It might be simpler to remove `timeoutFlag` and invoke `triggerIfNeeded(true)` directly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -51,15 +56,24 @@ public class AsyncExecutionController<K> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
 
-    public static final int DEFAULT_BATCH_SIZE = 1000;
-    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+    private static final int DEFAULT_BATCH_SIZE = 1000;
+
+    private static final int DEFAULT_BUFFER_TIMEOUT = 1000;

Review Comment:
   How about remove the constants and use `ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the `ExecutionConfig`? This can help avoid maintaining the same value in two places.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;

Review Comment:
   The "O` in "BufferTimeOut` is upper-case while the "o" in timeoutFlag is lower-case. It might be better to get them unified to the same convention.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##########
@@ -346,6 +337,63 @@ public void testSyncPoint() {
         recordContext2.release();
     }
 
+    @Test
+    void testBufferTimeout() throws InterruptedException {
+        batchSize = 5;
+        timeout = 1000;
+        setup();
+        Runnable userCode = () -> valueState.asyncValue();
+
+        // ------------ basic timeout -------------------
+        for (int i = 0; i < batchSize - 1; i++) {
+            String record = String.format("key%d-r%d", i, i);
+            String key = String.format("key%d", batchSize + i);
+            RecordContext<String> recordContext = aec.buildContext(record, key);
+            aec.setCurrentContext(recordContext);
+            userCode.run();
+        }
+        assertThat(aec.timeoutFlag.get()).isFalse();
+        assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+        Thread.sleep(timeout + 100);

Review Comment:
   It might be better to avoid using `Thread.sleep` in test cases, as it may improve the duration of CI and behave as flaky tests. How about introduce a `TestScheduledThreadPoolExecutor` so we can control when each step is triggered? An example of this is `org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will
+     * perform actively when the next state request arrives even if the activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Would it be better to reuse existing utility methods like `FutureUtils.delay()`? This way AEC won't need to maintain such resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org