You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/11 17:32:40 UTC

[GitHub] [flink] hlteoh37 opened a new pull request, #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

hlteoh37 opened a new pull request, #20245:
URL: https://github.com/apache/flink/pull/20245

   … for AsyncSinkWriter
   
   ## What is the purpose of the change
   This pull request makes the RateLimitingStrategy used in AsyncSinkWriter configurable by the sink implementer. That way a sink implementer can decide to implement a custom RateLimitingStrategy.
   
   
   ## Brief change log
   - Introduced RateLimitingStrategy interface
   - Implemented a CongestionControlRateLimitingStrategy
   - Migrated tracking of inFlightMessages from AsyncSinkWriter into the RateLimitingStrategy
   - Introduced logic within AsyncSinkWriter to check with RateLimitingStrategy if the next request should be blocked.
   - Introduced a new AsyncSinkWriter constructor and deprecated the old AsyncSinkWriter constructors.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added test that verifies AsyncSinkWriter respects the RateLimitingStrategy passed into the constructor
   - Added test that verifies AIMDRateLimitingStrategy follows a linear increase and multiplicative decrease
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? [FLIP-242: Introduce configurable RateLimitingStrategy for Async Sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918857597


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();

Review Comment:
   Good point. Will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r924509131


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java:
##########
@@ -19,6 +19,8 @@
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.junit.jupiter.api.BeforeEach;

Review Comment:
   Yes, there are no functional changes for AIMD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1209258678

   @pnowojski any final comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939060670


##########
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/resources/send-orders.sql:
##########
@@ -29,7 +29,7 @@ CREATE TABLE orders (
   'aws.credentials.basic.secretkey' = 'secretAccessKey',
   'aws.trust.all.certificates' = 'true',
   'sink.http-client.protocol.version' = 'HTTP1_1',
-  'sink.batch.max-size' = '1',
+  'sink.batch.max-size' = '2',

Review Comment:
   Was testing the impact on the ITCase. Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940289412


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */

Review Comment:
   OK removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940280413


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/BasicRequestInfo.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+/** Dataclass to encapsulate information about starting requests. */
+public class BasicRequestInfo implements RequestInfo {

Review Comment:
   OK done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] foxus commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
foxus commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918661244


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();

Review Comment:
   Should `requestStartTime` be set when we're about to submit the request entries on line 382 rather than when we create the `RequestInfo`?
   
   If we're using this timestamp to collect statistics for request durations, it would be more accurate to set the timestamp as close to sending the request as possible.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);

Review Comment:
   Based on the fact we set `batchSize` in line 376, is this line a non-op?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RateLimitingStrategy.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Controls the rate of requests being made in the {@code AsyncSinkWriter}.
+ *
+ * <p>The RateLimitingStrategy is consulted in the {@code AsyncSinkWriter} before sending a request.

Review Comment:
   ```suggestion
    * <p>The RateLimitingStrategy is consulted by the {@code AsyncSinkWriter} before sending a request.
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;

Review Comment:
   Should we rely on the `batch.size()` rather than the requestInfo's `batchSize` in case `createNextAvailableBatch` has mutated it?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategyTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Test class for {@link AIMDScalingStrategy}. */
+public class AIMDScalingStrategyTest {
+
+    @Test
+    public void testScaleUpAdditively() {
+        final int increaseRate = 10;
+        final int currentRate = 4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(1000)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(currentRate)).isEqualTo(increaseRate + currentRate);
+    }
+
+    @Test
+    public void testScaleUpRespectsRateThreshold() {
+        final int increaseRate = 10;
+        final int currentRate = 14;
+        final int rateThreshold = 20;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(rateThreshold)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(increaseRate + currentRate)).isEqualTo(rateThreshold);
+    }
+
+    @Test
+    public void testScaleDownByFactor() {
+        final double decreaseFactor = 0.4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(10)
+                        .setDecreaseFactor(decreaseFactor)
+                        .setRateThreshold(1000)
+                        .build();
+
+        // roundDown(10 * 0.4 = 4) = 4
+        assertThat(scalingStrategy.scaleDown(10)).isEqualTo(4);
+
+        // roundDown(314 * 0.4 = 125.6) = 125

Review Comment:
   This comment does not match the assertion below.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -391,22 +390,24 @@ private void flush() throws InterruptedException {
                                                 timestampOfRequest),
                                 "Mark in-flight request as completed and requeue %d request entries",
                                 failedRequestEntries.size());
-
+        rateLimitingStrategy.registerInFlightRequest(requestInfo);
         inFlightRequestsCount++;
-        inFlightMessages += batchSize;
-        submitRequestEntries(batch, requestResult);
+        submitRequestEntries(batch, requestResultCallback);
+    }
+
+    private int getNextBatchSize() {
+        return Math.min(rateLimitingStrategy.getMaxBatchSize(), bufferedRequestEntries.size());
     }
 
     /**
      * Creates the next batch of request entries while respecting the {@code maxBatchSize} and
      * {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
      */
-    private List<RequestEntryT> createNextAvailableBatch() {
-        int batchSize = Math.min(getNextBatchSizeLimit(), bufferedRequestEntries.size());
-        List<RequestEntryT> batch = new ArrayList<>(batchSize);
+    private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
+        List<RequestEntryT> batch = new ArrayList<>(requestInfo.batchSize);
 
         int batchSizeBytes = 0;

Review Comment:
   Should this be a long to avoid potential overflow when adding the `requestEntrySize` long to it later?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+
+    private final int maxInFlightRequests;
+    private final AIMDScalingStrategy aimdScalingStrategy;
+    private int maxInFlightMessages;
+
+    private int currentInFlightRequests;
+    private int currentInFlightMessages;
+
+    private CongestionControlRateLimitingStrategy(
+            int maxInFlightRequests,
+            int initialMaxInFlightMessages,
+            AIMDScalingStrategy aimdScalingStrategy) {
+        Preconditions.checkArgument(
+                maxInFlightRequests > 0, "maxInFlightRequests must be a positive integer.");
+        Preconditions.checkArgument(
+                initialMaxInFlightMessages > 0,
+                "initialMaxInFlightMessages must be a positive integer.");
+        Preconditions.checkNotNull(aimdScalingStrategy, "aimdScalingStrategy must be provided.");
+
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxInFlightMessages = initialMaxInFlightMessages;
+        this.aimdScalingStrategy = aimdScalingStrategy;
+    }
+
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.batchSize;
+    }
+
+    @Override
+    public void registerCompletedRequest(RequestInfo requestInfo) {
+        currentInFlightRequests--;

Review Comment:
   Should we protect against this value going below zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939059196


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final long requestStartTime;

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r924493155


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,69 +351,69 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         if (batch.size() == 0) {
             return;
         }
 
-        long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        int batchSize = requestInfo.getBatchSize();

Review Comment:
   `batch.size()` is the better one to use, but we need to be consistent across calls to `registerInFlightRequest` and `registerCompletedRequest` calls.
   
   Should we create a new instance of `RequestInfo`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r924174639


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final long requestStartTime;

Review Comment:
   It is not clear here what the unit is. Suggest you use `Instant` instead of long



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,69 +351,69 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         if (batch.size() == 0) {
             return;
         }
 
-        long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        int batchSize = requestInfo.getBatchSize();

Review Comment:
   At this point is `requestInfo.getBatchSize()` guaranteed to equal `batch.size()`? I am guessing not from line 385. In which case is this the right one to use? 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RateLimitingStrategy.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Controls the rate of requests being made in the {@code AsyncSinkWriter}.

Review Comment:
   nit: It is a smell to know about `AsyncSinkWriter` here



##########
flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/resources/send-orders.sql:
##########
@@ -29,7 +29,7 @@ CREATE TABLE orders (
   'aws.credentials.basic.secretkey' = 'secretAccessKey',
   'aws.trust.all.certificates' = 'true',
   'sink.http-client.protocol.version' = 'HTTP1_1',
-  'sink.batch.max-size' = '1',
+  'sink.batch.max-size' = '2',

Review Comment:
   Why did you need to change this?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java:
##########
@@ -19,6 +19,8 @@
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.junit.jupiter.api.BeforeEach;

Review Comment:
   There are no new tests in here. Are these changes adequately captured by the existing tests? I suppose this is a non-functional change for AIMD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940414614


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/AsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * AsyncSinkWriterConfiguration configures the {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter}.
+ */
+@PublicEvolving
+public interface AsyncSinkWriterConfiguration {

Review Comment:
   Done, removed the interface



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/AsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * AsyncSinkWriterConfiguration configures the {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter}.
+ */
+@PublicEvolving
+public interface AsyncSinkWriterConfiguration {

Review Comment:
   Changed to AsyncSinkConfiguration instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939907643


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/ResultInfo.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate results from completed requests. */
+@PublicEvolving
+public class ResultInfo {

Review Comment:
   Refactored to remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939059448


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RateLimitingStrategy.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Controls the rate of requests being made in the {@code AsyncSinkWriter}.

Review Comment:
   Changed the description to remove reference to `AsyncSinkWriter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1208062797

   
   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927824434


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public Instant getRequestStartTime() {
+        return requestStartTime;
+    }
+
+    /** Builder for {@link RequestInfo} dataclass. */
+    public static class RequestInfoBuilder {
+        private int batchSize;
+        private Instant requestStartTime;
+
+        public RequestInfoBuilder setBatchSize(final int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public RequestInfoBuilder setRequestStartTime(final Instant requestStartTime) {
+            this.requestStartTime = requestStartTime;
+            return this;
+        }
+
+        public RequestInfo build() {
+            return new RequestInfo(batchSize, requestStartTime);
+        }
+    }

Review Comment:
   I think a fluent builder makes the code more readable, especially for a dataclass like this, which can be extended to have more information in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1182835559

   Please squash commits before the final push.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1182837228

   Your CI failed with compile errors, looks like `spotless` failures. You should run a build locally to ensure all tests and quality checks pass. When spotless fails you can run a command to fix it, which is output to the terminal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940225133


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,43 +242,54 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                BasicAsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                states);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            BasicAsyncSinkWriterConfiguration configuration,

Review Comment:
   Adding to @dannycranmer comments. 
   
   Why do we have this split between `BasicAsyncSinkWriterConfiguration` and `AsyncSinkWriterConfiguration`? If `BasicAsyncSinkWriterConfiguration` is supposed to be an internal class, then how users are supposed to construct `AsyncSinkWriter`?
   
   If we indeed need/want `AsyncSinkWriterConfiguration`, then it seems to me like its builder and concrete implementation also should be publicly available class and we could basically squash them into one?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/BasicResultInfo.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+/** Dataclass to encapsulate results from completed requests. */
+public class BasicResultInfo implements ResultInfo {

Review Comment:
   `@Internal` to avoid confusion?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/BasicRequestInfo.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+/** Dataclass to encapsulate information about starting requests. */
+public class BasicRequestInfo implements RequestInfo {

Review Comment:
   `@Internal` to avoid confusion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918859145


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);

Review Comment:
   I was meant to setBatchSize to be `batch.size()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r922648622


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+
+    private final int maxInFlightRequests;
+    private final AIMDScalingStrategy aimdScalingStrategy;

Review Comment:
   I think for this case, we can follow customer requirement, since this coupling is with the CongestionControlStrategy rather than the generic RateLimitingStrategy. At the moment, this CongestionControl strategy is modeled after TCP CongestionControl, which should work really well with AIMD. https://en.wikipedia.org/wiki/TCP_congestion_control



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940199902


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/AsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * AsyncSinkWriterConfiguration configures the {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter}.
+ */
+@PublicEvolving
+public interface AsyncSinkWriterConfiguration {

Review Comment:
   Should this be more general `AsyncSinkConfiguration` and be used by the `AsyncSinkBase` too, since they are duplicated configs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940274362


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */
+public class BasicAsyncSinkWriterConfiguration implements AsyncSinkWriterConfiguration {
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final RateLimitingStrategy rateLimitingStrategy;
+
+    private BasicAsyncSinkWriterConfiguration(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.rateLimitingStrategy = rateLimitingStrategy;
+    }
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    public RateLimitingStrategy getRateLimitingStrategy() {
+        return rateLimitingStrategy;
+    }
+
+    public static BasicAsyncSinkWriterConfigurationBuilder builder() {
+        return new BasicAsyncSinkWriterConfigurationBuilder();
+    }
+
+    /** Builder for {@link BasicAsyncSinkWriterConfiguration}. */
+    public static class BasicAsyncSinkWriterConfigurationBuilder {
+
+        private int maxBatchSize = 500;
+        private int maxInFlightRequests = 50;
+        private int maxBufferedRequests = 10_000;
+        private long maxBatchSizeInBytes = 5 * 1024 * 1024;
+        private long maxTimeInBufferMS = 5_000;
+        private long maxRecordSizeInBytes = 1024 * 1024;
+        private RateLimitingStrategy rateLimitingStrategy;
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSize(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxInFlightRequests(
+                int maxInFlightRequests) {
+            this.maxInFlightRequests = maxInFlightRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBufferedRequests(
+                int maxBufferedRequests) {
+            this.maxBufferedRequests = maxBufferedRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSizeInBytes(
+                long maxBatchSizeInBytes) {
+            this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxTimeInBufferMS(
+                long maxTimeInBufferMS) {
+            this.maxTimeInBufferMS = maxTimeInBufferMS;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxRecordSizeInBytes(
+                long maxRecordSizeInBytes) {
+            this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setRateLimitingStrategy(
+                RateLimitingStrategy rateLimitingStrategy) {
+            this.rateLimitingStrategy = rateLimitingStrategy;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfiguration build() {
+            if (rateLimitingStrategy == null) {

Review Comment:
   Yes, we would use a no-op strategy. I will add one



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */
+public class BasicAsyncSinkWriterConfiguration implements AsyncSinkWriterConfiguration {
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final RateLimitingStrategy rateLimitingStrategy;
+
+    private BasicAsyncSinkWriterConfiguration(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.rateLimitingStrategy = rateLimitingStrategy;
+    }
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    public RateLimitingStrategy getRateLimitingStrategy() {
+        return rateLimitingStrategy;
+    }
+
+    public static BasicAsyncSinkWriterConfigurationBuilder builder() {
+        return new BasicAsyncSinkWriterConfigurationBuilder();
+    }
+
+    /** Builder for {@link BasicAsyncSinkWriterConfiguration}. */
+    public static class BasicAsyncSinkWriterConfigurationBuilder {
+
+        private int maxBatchSize = 500;
+        private int maxInFlightRequests = 50;
+        private int maxBufferedRequests = 10_000;
+        private long maxBatchSizeInBytes = 5 * 1024 * 1024;
+        private long maxTimeInBufferMS = 5_000;
+        private long maxRecordSizeInBytes = 1024 * 1024;
+        private RateLimitingStrategy rateLimitingStrategy;
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSize(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxInFlightRequests(
+                int maxInFlightRequests) {
+            this.maxInFlightRequests = maxInFlightRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBufferedRequests(
+                int maxBufferedRequests) {
+            this.maxBufferedRequests = maxBufferedRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSizeInBytes(
+                long maxBatchSizeInBytes) {
+            this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxTimeInBufferMS(
+                long maxTimeInBufferMS) {
+            this.maxTimeInBufferMS = maxTimeInBufferMS;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxRecordSizeInBytes(
+                long maxRecordSizeInBytes) {
+            this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setRateLimitingStrategy(
+                RateLimitingStrategy rateLimitingStrategy) {
+            this.rateLimitingStrategy = rateLimitingStrategy;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfiguration build() {
+            if (rateLimitingStrategy == null) {

Review Comment:
   Yes, we would use a no-op strategy. Will add one in the next revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940275230


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/AsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * AsyncSinkWriterConfiguration configures the {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter}.
+ */
+@PublicEvolving
+public interface AsyncSinkWriterConfiguration {

Review Comment:
   OK, can do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1208983984

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918860086


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -391,22 +390,24 @@ private void flush() throws InterruptedException {
                                                 timestampOfRequest),
                                 "Mark in-flight request as completed and requeue %d request entries",
                                 failedRequestEntries.size());
-
+        rateLimitingStrategy.registerInFlightRequest(requestInfo);
         inFlightRequestsCount++;
-        inFlightMessages += batchSize;
-        submitRequestEntries(batch, requestResult);
+        submitRequestEntries(batch, requestResultCallback);
+    }
+
+    private int getNextBatchSize() {
+        return Math.min(rateLimitingStrategy.getMaxBatchSize(), bufferedRequestEntries.size());
     }
 
     /**
      * Creates the next batch of request entries while respecting the {@code maxBatchSize} and
      * {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
      */
-    private List<RequestEntryT> createNextAvailableBatch() {
-        int batchSize = Math.min(getNextBatchSizeLimit(), bufferedRequestEntries.size());
-        List<RequestEntryT> batch = new ArrayList<>(batchSize);
+    private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
+        List<RequestEntryT> batch = new ArrayList<>(requestInfo.batchSize);
 
         int batchSizeBytes = 0;

Review Comment:
   Sure sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918861002


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategyTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Test class for {@link AIMDScalingStrategy}. */
+public class AIMDScalingStrategyTest {
+
+    @Test
+    public void testScaleUpAdditively() {
+        final int increaseRate = 10;
+        final int currentRate = 4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(1000)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(currentRate)).isEqualTo(increaseRate + currentRate);
+    }
+
+    @Test
+    public void testScaleUpRespectsRateThreshold() {
+        final int increaseRate = 10;
+        final int currentRate = 14;
+        final int rateThreshold = 20;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(rateThreshold)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(increaseRate + currentRate)).isEqualTo(rateThreshold);
+    }
+
+    @Test
+    public void testScaleDownByFactor() {
+        final double decreaseFactor = 0.4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(10)
+                        .setDecreaseFactor(decreaseFactor)
+                        .setRateThreshold(1000)
+                        .build();
+
+        // roundDown(10 * 0.4 = 4) = 4
+        assertThat(scalingStrategy.scaleDown(10)).isEqualTo(4);
+
+        // roundDown(314 * 0.4 = 125.6) = 125

Review Comment:
   Good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939058598


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategy.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * AIMDScalingStrategy scales up linearly and scales down multiplicatively. See
+ * https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease for more details
+ */
+@PublicEvolving
+public class AIMDScalingStrategy {
+    private final int increaseRate;
+    private final double decreaseFactor;
+    private final int rateThreshold;
+
+    public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int rateThreshold) {
+        Preconditions.checkArgument(increaseRate > 0, "increaseRate must be positive integer.");
+        Preconditions.checkArgument(
+                decreaseFactor < 1.0 && decreaseFactor > 0.0,
+                "decreaseFactor must be strictly between 0.0 and 1.0.");
+        Preconditions.checkArgument(rateThreshold > 0, "rateThreshold must be a positive integer.");
+        Preconditions.checkArgument(
+                rateThreshold >= increaseRate, "rateThreshold must be larger than increaseRate.");
+        this.increaseRate = increaseRate;
+        this.decreaseFactor = decreaseFactor;
+        this.rateThreshold = rateThreshold;
+    }
+
+    public int scaleUp(int currentRate) {
+        return Math.min(currentRate + increaseRate, rateThreshold);
+    }
+
+    public int scaleDown(int currentRate) {
+        return Math.max(1, (int) Math.round(currentRate * decreaseFactor));
+    }
+
+    @PublicEvolving
+    public static AIMDScalingStrategyBuilder builder() {
+        return new AIMDScalingStrategyBuilder();
+    }
+
+    /** Builder for {@link AIMDScalingStrategy}. */
+    public static class AIMDScalingStrategyBuilder {
+
+        private int increaseRate = 10;
+        private double decreaseFactor = 0.5;
+        private int rateThreshold;

Review Comment:
   Ok, made the rateThreshold a required field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940278843


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/BasicResultInfo.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+/** Dataclass to encapsulate results from completed requests. */
+public class BasicResultInfo implements ResultInfo {

Review Comment:
   OK added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927823525


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,69 +352,69 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        Instant requestStartTime = Instant.now();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();

Review Comment:
   Happy to remove `requestStartTime`. It is not used by the current implementation of `RateLimitingStrategy` at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927747602


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategy.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * AIMDScalingStrategy scales up linearly and scales down multiplicatively. See
+ * https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease for more details
+ */
+@PublicEvolving
+public class AIMDScalingStrategy {
+    private final int increaseRate;
+    private final double decreaseFactor;
+    private final int rateThreshold;
+
+    public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int rateThreshold) {
+        Preconditions.checkArgument(increaseRate > 0, "increaseRate must be positive integer.");
+        Preconditions.checkArgument(
+                decreaseFactor < 1.0 && decreaseFactor > 0.0,
+                "decreaseFactor must be strictly between 0.0 and 1.0.");
+        Preconditions.checkArgument(rateThreshold > 0, "rateThreshold must be a positive integer.");
+        Preconditions.checkArgument(
+                rateThreshold >= increaseRate, "rateThreshold must be larger than increaseRate.");
+        this.increaseRate = increaseRate;
+        this.decreaseFactor = decreaseFactor;
+        this.rateThreshold = rateThreshold;
+    }
+
+    public int scaleUp(int currentRate) {
+        return Math.min(currentRate + increaseRate, rateThreshold);
+    }
+
+    public int scaleDown(int currentRate) {
+        return Math.max(1, (int) Math.round(currentRate * decreaseFactor));
+    }
+
+    @PublicEvolving
+    public static AIMDScalingStrategyBuilder builder() {
+        return new AIMDScalingStrategyBuilder();
+    }
+
+    /** Builder for {@link AIMDScalingStrategy}. */
+    public static class AIMDScalingStrategyBuilder {
+
+        private int increaseRate = 10;
+        private double decreaseFactor = 0.5;
+        private int rateThreshold;

Review Comment:
   nit: why does it have default value `0` if later you are doing `checkArgument(rateThreshold > 0)` in the constructor? If it's obligatory parameter without a good default value, then I think it would be cleaner to pass it through the constructor to the `AIMDScalingStrategyBuilder`:
   
   ```
   AIMDScalingStrategyBuilder
     .builder(myRateThreshold)
     .setOptionalParam1(foo)
     .setOptionalParam2(bar)
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public Instant getRequestStartTime() {
+        return requestStartTime;
+    }
+
+    /** Builder for {@link RequestInfo} dataclass. */
+    public static class RequestInfoBuilder {
+        private int batchSize;
+        private Instant requestStartTime;
+
+        public RequestInfoBuilder setBatchSize(final int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public RequestInfoBuilder setRequestStartTime(final Instant requestStartTime) {
+            this.requestStartTime = requestStartTime;
+            return this;
+        }
+
+        public RequestInfo build() {
+            return new RequestInfo(batchSize, requestStartTime);
+        }
+    }

Review Comment:
   Why do we need a builder for this class? Especially given that both of those parameters seems to be obligatory?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,69 +352,69 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        Instant requestStartTime = Instant.now();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();

Review Comment:
   I have some doubts if creating new `RequestInfo` per every record is a good idea.
   
   1. Just sheer fact of creating new object might increase GC pressure and add a bit of overhead
   2. I would expect `Instant.now();` to be very costly to invoke. Isn't this a syscall underneath?  Why do we even need `requestStartTime`  in the `RequestInfo`?
   
   And to me it looks like this object is reconstructed over and over again at least once per every `AsyncSinkWriter#write` call?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/ResultInfo.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate results from completed requests. */
+@PublicEvolving
+public class ResultInfo {

Review Comment:
   Same two questions as in `RequestInfo`:
   
   1. Having a builder with optional setter methods without a sane default values doesn't make sense to me at the first glance.
   2. We shouldn't be exposing the concrete class and the builder as `PublicEvolving` API (unless I'm missing something)



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,14 +243,42 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                states,
+                CongestionControlRateLimitingStrategy.builder()
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setInitialMaxInFlightMessages(maxBatchSize)
+                        .setAimdScalingStrategy(
+                                AIMDScalingStrategy.builder()
+                                        .setRateThreshold(maxBatchSize * maxInFlightRequests)
+                                        .build())
+                        .build());
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            RateLimitingStrategy rateLimitingStrategy) {

Review Comment:
   Does the `AsyncSinkWriter` have any documentation? How users are supposed to know that it exists and especially what kind of `RateLimitingStrategy` they can use?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }

Review Comment:
   Does user need to know how to construct this class? Shouldn't only a `RequestInfo` interface by `@PublicEvolving` while a concrete implementation by `@Internal` to the `AsyncSinkWriter`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1180685284

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c342f9dc454ef0fdd2e4b8666c59778ad960696a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c342f9dc454ef0fdd2e4b8666c59778ad960696a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c342f9dc454ef0fdd2e4b8666c59778ad960696a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940104680


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,14 +243,42 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                states,
+                CongestionControlRateLimitingStrategy.builder()
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setInitialMaxInFlightMessages(maxBatchSize)
+                        .setAimdScalingStrategy(
+                                AIMDScalingStrategy.builder()
+                                        .setRateThreshold(maxBatchSize * maxInFlightRequests)
+                                        .build())
+                        .build());
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            RateLimitingStrategy rateLimitingStrategy) {

Review Comment:
   We're writing a blog post about it instead (for now) https://issues.apache.org/jira/browse/FLINK-25208
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1207972056

   
   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer merged pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer merged PR #20245:
URL: https://github.com/apache/flink/pull/20245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939909307


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +343,42 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
+                || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        return RequestInfo.builder().setBatchSize(batchSize).build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);

Review Comment:
   Refactored to make `batchSize` immutable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on PR #20245:
URL: https://github.com/apache/flink/pull/20245#issuecomment-1207882565

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940289892


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,43 +242,54 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                BasicAsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                states);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            BasicAsyncSinkWriterConfiguration configuration,

Review Comment:
   Yes, changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940415649


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */
+public class BasicAsyncSinkWriterConfiguration implements AsyncSinkWriterConfiguration {
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final RateLimitingStrategy rateLimitingStrategy;
+
+    private BasicAsyncSinkWriterConfiguration(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.rateLimitingStrategy = rateLimitingStrategy;
+    }
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    public RateLimitingStrategy getRateLimitingStrategy() {
+        return rateLimitingStrategy;
+    }
+
+    public static BasicAsyncSinkWriterConfigurationBuilder builder() {
+        return new BasicAsyncSinkWriterConfigurationBuilder();
+    }
+
+    /** Builder for {@link BasicAsyncSinkWriterConfiguration}. */
+    public static class BasicAsyncSinkWriterConfigurationBuilder {
+
+        private int maxBatchSize = 500;
+        private int maxInFlightRequests = 50;
+        private int maxBufferedRequests = 10_000;
+        private long maxBatchSizeInBytes = 5 * 1024 * 1024;
+        private long maxTimeInBufferMS = 5_000;
+        private long maxRecordSizeInBytes = 1024 * 1024;

Review Comment:
   Removed defaults, and made it mandatory in builder by implementing interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r928553687


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public Instant getRequestStartTime() {
+        return requestStartTime;
+    }
+
+    /** Builder for {@link RequestInfo} dataclass. */
+    public static class RequestInfoBuilder {
+        private int batchSize;
+        private Instant requestStartTime;
+
+        public RequestInfoBuilder setBatchSize(final int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public RequestInfoBuilder setRequestStartTime(final Instant requestStartTime) {
+            this.requestStartTime = requestStartTime;
+            return this;
+        }
+
+        public RequestInfo build() {
+            return new RequestInfo(batchSize, requestStartTime);
+        }
+    }

Review Comment:
   Builder pattern uses most of its use for parameters that are obligatory. What's the point of giving flexibility to call 
   ```
   RequestInfo.builder().build();
   ```
   If the only thing it does, is throwing an exception that some parameters were not set?
   
   On top of that, for a trivial 2 parameters POJO, that doesn't have to be constructed in the `@Public`/`@PublicEvolving` API, builder is an overkill IMO. As I suggested elsewhere, you could make `@PublicEvolving` `RequestInfo` as an interface with only the getters, and just have `@Internal` `class RequestInfoImpl` without any builder. Construction would be simpler while public api would be smaller and more flexibly for the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940415252


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,43 +242,54 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                BasicAsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                states);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            BasicAsyncSinkWriterConfiguration configuration,

Review Comment:
   Changed to AsyncSinkConfiguration implementation, and made both the builder and the configuration class public



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939058926


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public Instant getRequestStartTime() {
+        return requestStartTime;
+    }
+
+    /** Builder for {@link RequestInfo} dataclass. */
+    public static class RequestInfoBuilder {
+        private int batchSize;
+        private Instant requestStartTime;
+
+        public RequestInfoBuilder setBatchSize(final int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public RequestInfoBuilder setRequestStartTime(final Instant requestStartTime) {
+            this.requestStartTime = requestStartTime;
+            return this;
+        }
+
+        public RequestInfo build() {
+            return new RequestInfo(batchSize, requestStartTime);
+        }
+    }

Review Comment:
   OK, implemented interfaces for `RequestInfo` and `ResultInfo`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918864106


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+
+    private final int maxInFlightRequests;
+    private final AIMDScalingStrategy aimdScalingStrategy;
+    private int maxInFlightMessages;
+
+    private int currentInFlightRequests;
+    private int currentInFlightMessages;
+
+    private CongestionControlRateLimitingStrategy(
+            int maxInFlightRequests,
+            int initialMaxInFlightMessages,
+            AIMDScalingStrategy aimdScalingStrategy) {
+        Preconditions.checkArgument(
+                maxInFlightRequests > 0, "maxInFlightRequests must be a positive integer.");
+        Preconditions.checkArgument(
+                initialMaxInFlightMessages > 0,
+                "initialMaxInFlightMessages must be a positive integer.");
+        Preconditions.checkNotNull(aimdScalingStrategy, "aimdScalingStrategy must be provided.");
+
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxInFlightMessages = initialMaxInFlightMessages;
+        this.aimdScalingStrategy = aimdScalingStrategy;
+    }
+
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.batchSize;
+    }
+
+    @Override
+    public void registerCompletedRequest(RequestInfo requestInfo) {
+        currentInFlightRequests--;

Review Comment:
   Yes, good catch, otherwise we might have a situation where currentInFlightRequests > maxInFlightRequests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918857861


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;

Review Comment:
   Sure, changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r919721608


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -436,9 +435,12 @@ private void completeRequest(
         ackTime = System.currentTimeMillis();
 
         inFlightRequestsCount--;
-        inFlightMessages -= batchSize;
-
-        updateInFlightMessagesLimit(failedRequestEntries.size() == 0);
+        rateLimitingStrategy.registerCompletedRequest(
+                RequestInfo.builder()

Review Comment:
   Another variation of `RequestInfo` used with varying fields set. 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +343,42 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
+                || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        return RequestInfo.builder().setBatchSize(batchSize).build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);
 
         long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        requestInfo.setRequestStartTime(timestampOfRequest);

Review Comment:
   This is a code smell to have fields of an object `null` in some cases and set in others. If `createNextAvailableBatch()` only requires batch size suggest passing in the `int` since it is private anyway. Then try to make  `requestInfo` immutable 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +343,42 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
+                || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        return RequestInfo.builder().setBatchSize(batchSize).build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);

Review Comment:
   Why do we need to update the `batchSize` here? Is it incase the batchSize is LESS than the original batch size specified by `createRequestInfo()` ?
   
   Ideally we should make `RequestInfo` immutable to follow general best practise. 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate information about starting / completing requests. */
+@PublicEvolving
+public class RequestInfo {
+    public final int failedMessages;
+    public int batchSize;
+    public long requestStartTime;

Review Comment:
   Use private fields with getters



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {

Review Comment:
   This needs `@PublicEvolving`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate information about starting / completing requests. */
+@PublicEvolving
+public class RequestInfo {
+    public final int failedMessages;
+    public int batchSize;
+    public long requestStartTime;
+
+    private RequestInfo(int failedMessages, int batchSize, long requestStartTime) {
+        this.failedMessages = failedMessages;
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public void setRequestStartTime(long requestStartTime) {
+        this.requestStartTime = requestStartTime;
+    }

Review Comment:
   as mentioned before, can we make this immutable? Seems like a smell to have a dataclass with a Builder and private constructor to have setters



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategyTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.junit.Test;

Review Comment:
   We should be using Junit 5 now, can you migrate, see this commit for example: 
   - https://github.com/apache/flink/commit/a10fd2314269e0686d9fbbd3eacb470a03432ef3



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -221,10 +196,12 @@ protected abstract void submitRequestEntries(
      * be the same).
      *
      * @param requestEntry the requestEntry for which we want to know the size
+     *
      * @return the size of the requestEntry, as defined previously
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
+    @Deprecated

Review Comment:
   nit: When deprecating it is nice to add javadoc that tells the caller what to use in preference



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+
+    private final int maxInFlightRequests;
+    private final AIMDScalingStrategy aimdScalingStrategy;

Review Comment:
   Why can this not be an interface so we can pass in arbitrary scaling strategies? This was part of the original design, which I know was "simplified", but it results in a code smell IMO, We have a coupling here, are we sure this is the right thing to do? 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategy.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * AIMDScalingStrategy scales up linearly and scales down multiplicatively. See
+ * https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease for more details
+ */
+public class AIMDScalingStrategy {

Review Comment:
   This needs `@PublicEvolving`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Dataclass to encapsulate information about starting / completing requests. */
+@PublicEvolving
+public class RequestInfo {
+    public final int failedMessages;
+    public int batchSize;
+    public long requestStartTime;
+
+    private RequestInfo(int failedMessages, int batchSize, long requestStartTime) {
+        this.failedMessages = failedMessages;
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public void setRequestStartTime(long requestStartTime) {
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }
+
+    /** Builder for {@link RequestInfo} dataclass. */
+    public static class RequestInfoBuilder {
+        private int failedMessages;
+        private int batchSize;
+        private long requestStartTime;
+

Review Comment:
   The fact these fields might be `0` under certain conditions which means "not set" reinforces the previous concerns I have mentioned. If we genuinely want to reuse this object and leverage setters we would need to distinguish between `failedMessages` as 0 or unset. Could be achieved with `-1`, but prefer refactor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939908736


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +343,42 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
+                || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        return RequestInfo.builder().setBatchSize(batchSize).build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
         int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);
 
         long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        requestInfo.setRequestStartTime(timestampOfRequest);

Review Comment:
   Refactored to remove `requestStartTime` from the dataclass



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dannycranmer commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r940191990


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -256,43 +242,54 @@ public AsyncSinkWriter(
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                BasicAsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                states);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            BasicAsyncSinkWriterConfiguration configuration,

Review Comment:
   I think you meant `AsyncSinkWriterConfiguration` here?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */
+public class BasicAsyncSinkWriterConfiguration implements AsyncSinkWriterConfiguration {
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final RateLimitingStrategy rateLimitingStrategy;
+
+    private BasicAsyncSinkWriterConfiguration(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.rateLimitingStrategy = rateLimitingStrategy;
+    }
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    public RateLimitingStrategy getRateLimitingStrategy() {
+        return rateLimitingStrategy;
+    }
+
+    public static BasicAsyncSinkWriterConfigurationBuilder builder() {
+        return new BasicAsyncSinkWriterConfigurationBuilder();
+    }
+
+    /** Builder for {@link BasicAsyncSinkWriterConfiguration}. */
+    public static class BasicAsyncSinkWriterConfigurationBuilder {
+
+        private int maxBatchSize = 500;
+        private int maxInFlightRequests = 50;
+        private int maxBufferedRequests = 10_000;
+        private long maxBatchSizeInBytes = 5 * 1024 * 1024;
+        private long maxTimeInBufferMS = 5_000;
+        private long maxRecordSizeInBytes = 1024 * 1024;

Review Comment:
   I do not think it is a good idea to have defaults in the base implementation. This also makes the validation less useful, users could develop sinks without considering these values and result in non-performant sink.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */

Review Comment:
   Base framework should not know about concrete implementations 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/AsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * AsyncSinkWriterConfiguration configures the {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter}.
+ */
+@PublicEvolving
+public interface AsyncSinkWriterConfiguration {

Review Comment:
   Why does this need an interface? Can you think of any usecase where this would add value? I would prefer an `AsyncSinkWriterConfiguration` object here. 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/config/BasicAsyncSinkWriterConfiguration.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.config;
+
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+
+/**
+ * BasicAsyncSinkWriterConfiguration with defaults that work well for streaming sources/sinks like
+ * Kinesis Data Streams and Kinesis Firehose.
+ */
+public class BasicAsyncSinkWriterConfiguration implements AsyncSinkWriterConfiguration {
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final RateLimitingStrategy rateLimitingStrategy;
+
+    private BasicAsyncSinkWriterConfiguration(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.rateLimitingStrategy = rateLimitingStrategy;
+    }
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    public RateLimitingStrategy getRateLimitingStrategy() {
+        return rateLimitingStrategy;
+    }
+
+    public static BasicAsyncSinkWriterConfigurationBuilder builder() {
+        return new BasicAsyncSinkWriterConfigurationBuilder();
+    }
+
+    /** Builder for {@link BasicAsyncSinkWriterConfiguration}. */
+    public static class BasicAsyncSinkWriterConfigurationBuilder {
+
+        private int maxBatchSize = 500;
+        private int maxInFlightRequests = 50;
+        private int maxBufferedRequests = 10_000;
+        private long maxBatchSizeInBytes = 5 * 1024 * 1024;
+        private long maxTimeInBufferMS = 5_000;
+        private long maxRecordSizeInBytes = 1024 * 1024;
+        private RateLimitingStrategy rateLimitingStrategy;
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSize(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxInFlightRequests(
+                int maxInFlightRequests) {
+            this.maxInFlightRequests = maxInFlightRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBufferedRequests(
+                int maxBufferedRequests) {
+            this.maxBufferedRequests = maxBufferedRequests;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxBatchSizeInBytes(
+                long maxBatchSizeInBytes) {
+            this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxTimeInBufferMS(
+                long maxTimeInBufferMS) {
+            this.maxTimeInBufferMS = maxTimeInBufferMS;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setMaxRecordSizeInBytes(
+                long maxRecordSizeInBytes) {
+            this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfigurationBuilder setRateLimitingStrategy(
+                RateLimitingStrategy rateLimitingStrategy) {
+            this.rateLimitingStrategy = rateLimitingStrategy;
+            return this;
+        }
+
+        public BasicAsyncSinkWriterConfiguration build() {
+            if (rateLimitingStrategy == null) {

Review Comment:
   What about if we do not want a rate limiting strategy? Should the sink use a no-op rather than null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

Posted by GitBox <gi...@apache.org>.
hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r939908131


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+    private final int batchSize;
+    private final Instant requestStartTime;
+
+    private RequestInfo(int batchSize, Instant requestStartTime) {
+        this.batchSize = batchSize;
+        this.requestStartTime = requestStartTime;
+    }
+
+    @PublicEvolving
+    public static RequestInfoBuilder builder() {
+        return new RequestInfoBuilder();
+    }

Review Comment:
   Refactored `RequestInfo` to be an `Interface`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org