You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/12 11:11:53 UTC

[GitHub] [flink] foxus commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

foxus commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918661244


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();

Review Comment:
   Should `requestStartTime` be set when we're about to submit the request entries on line 382 rather than when we create the `RequestInfo`?
   
   If we're using this timestamp to collect statistics for request durations, it would be more accurate to set the timestamp as close to sending the request as possible.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;
+        if (batchSize == 0) {
             return;
         }
+        requestInfo.setBatchSize(batchSize);

Review Comment:
   Based on the fact we set `batchSize` in line 376, is this line a non-op?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RateLimitingStrategy.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Controls the rate of requests being made in the {@code AsyncSinkWriter}.
+ *
+ * <p>The RateLimitingStrategy is consulted in the {@code AsyncSinkWriter} before sending a request.

Review Comment:
   ```suggestion
    * <p>The RateLimitingStrategy is consulted by the {@code AsyncSinkWriter} before sending a request.
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -344,44 +342,45 @@ public void write(InputT element, Context context) throws IOException, Interrupt
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        while (!isInFlightRequestOrMessageLimitExceeded()
+        while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
-    /**
-     * Determines if the sink should block and complete existing in flight requests before it may
-     * prudently create any new ones. This is exactly determined by if the number of requests
-     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
-     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
-     */
-    private boolean isInFlightRequestOrMessageLimitExceeded() {
-        return inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    private RequestInfo createRequestInfo() {
+        int batchSize = getNextBatchSize();
+        long requestStartTime = System.currentTimeMillis();
+        return RequestInfo.builder()
+                .setBatchSize(batchSize)
+                .setRequestStartTime(requestStartTime)
+                .build();
     }
 
     /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
-     * <p>The method blocks if too many async requests are in flight.
+     * <p>The method checks with the {@code rateLimitingStrategy} to see if it should block the
+     * request.
      */
     private void flush() throws InterruptedException {
-        while (isInFlightRequestOrMessageLimitExceeded()) {
+        RequestInfo requestInfo = createRequestInfo();
+        while (rateLimitingStrategy.shouldBlock(requestInfo)) {
             mailboxExecutor.yield();
+            requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch();
-        int batchSize = batch.size();
-
-        if (batch.size() == 0) {
+        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        int batchSize = requestInfo.batchSize;

Review Comment:
   Should we rely on the `batch.size()` rather than the requestInfo's `batchSize` in case `createNextAvailableBatch` has mutated it?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategyTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Test class for {@link AIMDScalingStrategy}. */
+public class AIMDScalingStrategyTest {
+
+    @Test
+    public void testScaleUpAdditively() {
+        final int increaseRate = 10;
+        final int currentRate = 4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(1000)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(currentRate)).isEqualTo(increaseRate + currentRate);
+    }
+
+    @Test
+    public void testScaleUpRespectsRateThreshold() {
+        final int increaseRate = 10;
+        final int currentRate = 14;
+        final int rateThreshold = 20;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(increaseRate)
+                        .setDecreaseFactor(0.5)
+                        .setRateThreshold(rateThreshold)
+                        .build();
+
+        assertThat(scalingStrategy.scaleUp(increaseRate + currentRate)).isEqualTo(rateThreshold);
+    }
+
+    @Test
+    public void testScaleDownByFactor() {
+        final double decreaseFactor = 0.4;
+
+        AIMDScalingStrategy scalingStrategy =
+                AIMDScalingStrategy.builder()
+                        .setIncreaseRate(10)
+                        .setDecreaseFactor(decreaseFactor)
+                        .setRateThreshold(1000)
+                        .build();
+
+        // roundDown(10 * 0.4 = 4) = 4
+        assertThat(scalingStrategy.scaleDown(10)).isEqualTo(4);
+
+        // roundDown(314 * 0.4 = 125.6) = 125

Review Comment:
   This comment does not match the assertion below.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -391,22 +390,24 @@ private void flush() throws InterruptedException {
                                                 timestampOfRequest),
                                 "Mark in-flight request as completed and requeue %d request entries",
                                 failedRequestEntries.size());
-
+        rateLimitingStrategy.registerInFlightRequest(requestInfo);
         inFlightRequestsCount++;
-        inFlightMessages += batchSize;
-        submitRequestEntries(batch, requestResult);
+        submitRequestEntries(batch, requestResultCallback);
+    }
+
+    private int getNextBatchSize() {
+        return Math.min(rateLimitingStrategy.getMaxBatchSize(), bufferedRequestEntries.size());
     }
 
     /**
      * Creates the next batch of request entries while respecting the {@code maxBatchSize} and
      * {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
      */
-    private List<RequestEntryT> createNextAvailableBatch() {
-        int batchSize = Math.min(getNextBatchSizeLimit(), bufferedRequestEntries.size());
-        List<RequestEntryT> batch = new ArrayList<>(batchSize);
+    private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
+        List<RequestEntryT> batch = new ArrayList<>(requestInfo.batchSize);
 
         int batchSizeBytes = 0;

Review Comment:
   Should this be a long to avoid potential overflow when adding the `requestEntrySize` long to it later?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategy.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code RateLimitingStrategy} implementation that does the following:
+ *
+ * <ul>
+ *   <li>Scales up when any request is successful.
+ *   <li>Scales down when any message in a request is unsuccessful.
+ *   <li>Uses Additive Increase / Multiplicative Decrease (AIMD) strategy to scale up/down.
+ * </ul>
+ *
+ * <p>This strategy works well for throughput-limited record-based sinks (e.g. Kinesis, Kafka).
+ */
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+
+    private final int maxInFlightRequests;
+    private final AIMDScalingStrategy aimdScalingStrategy;
+    private int maxInFlightMessages;
+
+    private int currentInFlightRequests;
+    private int currentInFlightMessages;
+
+    private CongestionControlRateLimitingStrategy(
+            int maxInFlightRequests,
+            int initialMaxInFlightMessages,
+            AIMDScalingStrategy aimdScalingStrategy) {
+        Preconditions.checkArgument(
+                maxInFlightRequests > 0, "maxInFlightRequests must be a positive integer.");
+        Preconditions.checkArgument(
+                initialMaxInFlightMessages > 0,
+                "initialMaxInFlightMessages must be a positive integer.");
+        Preconditions.checkNotNull(aimdScalingStrategy, "aimdScalingStrategy must be provided.");
+
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxInFlightMessages = initialMaxInFlightMessages;
+        this.aimdScalingStrategy = aimdScalingStrategy;
+    }
+
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.batchSize;
+    }
+
+    @Override
+    public void registerCompletedRequest(RequestInfo requestInfo) {
+        currentInFlightRequests--;

Review Comment:
   Should we protect against this value going below zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org