You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/24 12:06:11 UTC

[GitHub] [flink-web] rmetzger commented on a diff in pull request #584: [FLINK-30156] Add blogpost for the Async sink custom RateLimitingStra…

rmetzger commented on code in PR #584:
URL: https://github.com/apache/flink-web/pull/584#discussion_r1031435470


##########
_posts/2022-11-25-async-sink-rate-limiting-strategy.md:
##########
@@ -0,0 +1,195 @@
+---
+layout: post
+title: "Optimising the throughput of async sinks using a custom RateLimitingStrategy"
+date: 2022-11-22T12:00:00.000Z
+authors:
+- liangtl:
+  name: "Hong Liang Teoh"
+excerpt: An overview of how to optimise the throughput of async sinks using a custom RateLimitingStrategy
+---
+
+## Introduction
+
+When designing a Flink data processing job, one of the key concerns is maximising job throughput. Sink throughput is a crucial factor because the sink throughput can determine the entire job’s throughput. We generally want the highest possible write rate in the sink without overloading the destination. However, since the factors which impact destination performance are variable over the job’s lifetime, the sink needs to adjust this write rate dynamically. Depending on the destination being written to, we might want to tune the write rate differently using a different RateLimitingStrategy.
+
+**This post explains how you can configure a custom RateLimitingStrategy on a connector that builds on the [AsyncSinkBase (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) to optimise sink throughput.** In the sections below, we first cover the design logic behind the AsyncSinkBase and the RateLimitingStrategy, then we take you through two example implementations of rate limiting strategies, specifically the CongestionControlRateLimitingStrategy and TokenBucketRateLimitingStrategy.
+
+### Background of the AsyncSinkBase
+
+When implementing the AsyncSinkBase, our main goal was to simplify building new async sinks to custom destinations by providing common functionalities used in building async sinks with at least once processing. This has allowed users to more easily write sinks to custom destinations, such as Amazon Kinesis Data Streams and Amazon Kinesis Firehose. An additional async sink to Amazon DynamoDB ([FLIP-252](https://cwiki.apache.org/confluence/display/FLINK/FLIP-252%3A+Amazon+DynamoDB+Sink+Connector)) is also being developed at the time of writing.
+
+The AsyncSinkBase offers the core implementation which handles the mechanics of async requests and responses. This includes retrying of failed messages, deciding when to flush records to the destination, and persisting un-flushed records to state during checkpointing. In order to aid throughput, the async sink also dynamically adjusts the request rate depending on the responses returned from the destination. Read more about this in our [previous 1.15 release blog post](https://flink.apache.org/2022/05/06/async-sink-base.html) or watch our [FlinkForward talk recording explaining the design of the Async Sink](https://www.youtube.com/watch?v=z-hYuLgbHuo).
+
+### Configuring the AsyncSinkBase
+
+When designing the AsyncSinkBase, we wanted users to be able to tune their custom connector implementations based on their specific use case and needs, without having to understand the low-level workings of the base sink itself.
+
+So, as part of our initial implementation in Flink 1.15, we exposed configurations such as `maxBatchSize`, `maxInFlightRequests`, `maxBufferedRequests`, `maxBatchSizeInBytes`, `maxTimeInBufferMS` and `maxRecordSizeInBytes` so that users can change the flushing and writing behaviour of the sink.
+
+In Flink 1.16, we have further extended this configurability to the RateLimitingStrategy used by the AsyncSinkBase ([FLIP-242](https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink)). With this change, users can now customise how the AsyncSinkBase dynamically adjusts the request rate in real-time to optimise throughput whilst mitigating back pressure. Example customisations would include changing the mathematical function used to scale the request rate, implementing a cool off period between rate adjustments, or implementing a token bucket RateLimitingStrategy.
+
+## Rationale behind the RateLimitingStrategy interface
+
+```java
+public interface RateLimitingStrategy {
+
+   // Information provided to the RateLimitingStrategy
+    void registerInFlightRequest(RequestInfo requestInfo);
+    void registerCompletedRequest(ResultInfo resultInfo);
+    
+    // Controls offered to the RateLimitingStrategy
+    boolean shouldBlock(RequestInfo requestInfo);
+    int getMaxBatchSize();
+}
+```
+
+There are 2 core ideas behind the RateLimitingStrategy interface:
+
+* **Information methods:** We need methods to provide the RateLimitingStrategy with sufficient information to track the rate of requests being made or rate of messages being sent (each request can have more than one message)
+* **Control methods:** We also need methods to allow the RateLimitingStrategy to control the request rate of the sink.
+
+These are the type of methods that we see in the RateLimitingStrategy interface. With `registerInFlightRequest()` and `registerCompletedRequest()`, the RateLimitingStrategy has sufficient information to track the number of in-flight requests, number of in-flight messages as well as the rate of these requests.
+
+With `shouldBlock()`, the RateLimitingStrategy can decide to postpone any new requests until a specified condition is met (e.g. current in-flight requests must not exceed a given number). This allows the RateLimitingStrategy to control the rate of requests being made to the destination, in order to increase throughput, or increase backpressure in the Flink job graph.
+
+With `getMaxBatchSize()`, the RateLimitingStrategy can dynamically adjust the number of messages packaged into a single request. This might be useful to optimise sink throughput if the request size affects the performance of the destination.
+
+## Implementing a custom RateLimitingStrategy
+
+### [Example 1]  CongestionControlRateLimitingStrategy
+
+The AsyncSinkBase comes pre-packaged with the CongestionControlRateLimitingStrategy. In this section, we explore its implementation.
+
+This strategy is modelled after [TCP congestion control](https://en.wikipedia.org/wiki/TCP_congestion_control), and aims to discover the highest possible request rate for the given destination. It achieves this by increasing the request rate until the sink gets throttled by the destination, at which point it will reduce the request rate.
+
+In this RateLimitingStrategy, we want to dynamically adjust the request rate by doing the following:
+
+* Set a maximum number of in-flight requests at any time
+* Set a maximum number of in-flight messages at any time (each request can have multiple messages)
+* When there is a successful request, we increase the maximum number of in-flight messages to maximise the request rate
+* When there is an unsuccessful request, we decrease the maximum number of in-flight messages to prevent overloading the destination
+* If there are multiple sink subtasks, each can keep track of their maximum number of in-flight messages independently
+
+This strategy means what we will start with a low request rate (slow start), but aggressively increase our request rate until the destination throttles us, which allows us to discover the highest possible request rate. It will also adjust the request rate if the conditions of the destination changes (e.g. another client starts writing to the same destination). This strategy works well if the destination implements traffic shaping and throttles once the bandwidth limit is reached (e.g. Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose).
+
+First, we implement the information methods to keep track of the number of in-flight requests and in-flight messages.
+
+```java
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.getBatchSize();
+    }
+    
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        currentInFlightRequests = Math.max(0, currentInFlightRequests - 1);
+        currentInFlightMessages = Math.max(0, currentInFlightMessages - resultInfo.getBatchSize());
+        
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    // ...
+}
+```
+
+Then we implement the control methods to dynamically adjust the request rate.
+
+We keep a current value for maxInFlightMessages and maxInFlightRequests, and postpone all new requests if maxInFlightRequests or maxInFlightMessages have been reached.
+
+Every time a request completes, the CongestionControlRateLimitingStrategy will check if there are any failed messages in the response. If there are, it will decrease maxInFlightMessages. If there are no failed messages, it will increase maxInFlightMessages. This gives us an indirect control of rate of messages being written to the destination.
+
+Side note: The default CongestionControlRateLimitingStrategy uses an Additive Increase / Multiplicative Decrease (AIMD) scaling strategy. This is also used in TCP congestion control to avoid overloading the destination by increasing write rate slowly but backing off quickly if throttled.
+
+```java
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        // ...
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return currentInFlightRequests >= maxInFlightRequests
+                || (currentInFlightMessages + requestInfo.getBatchSize() > maxInFlightMessages);
+    }
+    // ...
+}
+```
+
+### [Example 2] TokenBucketRateLimitingStrategy
+
+The CongestionControlRateLimitingStrategy is rather aggressive, and relies on a robust server-side rate limiting strategy. In the event we don’t have a robust server-side rate limiting strategy, we can implement a client-side rate limiting strategy.
+
+As an example, we can look at the [token bucket rate limiting strategy](https://en.wikipedia.org/wiki/Token_bucket). If the limits are set correctly, we will avoid overloading the destination altogether.
+
+In this strategy, we want to do the following:
+
+* Implement a TokenBucket that has a given initial number of tokens (e.g. 10). These tokens refill at a given rate (e.g. 1 token per second).
+* When preparing an async request, we check if the token bucket has sufficient tokens. If not, we postpone the request.
+
+Let’s look at an example implementation:
+
+```java
+public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {
+    
+    private final Bucket bucket;
+
+    public TokenBucketRateLimitingStrategy() {
+        Refill refill = Refill.intervally(1, Duration.ofSeconds(1));
+        Bandwidth limit = Bandwidth.classic(10, refill);
+        this.bucket = Bucket4j.builder()
+            .addLimit(limit)
+            .build();
+    }
+    
+    // ... (information methods not needed)
+    
+    @Override
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return bucket.tryConsume(requestInfo.getBatchSize());
+    }
+}
+```
+
+In the above example, we use the Token Bucket implementation in the [Bucket4j](https://github.com/bucket4j/bucket4j) library. We also map 1 message to 1 token. Since our token bucket has a size of 10 tokens and a refill rate of 1 token per second, we can be sure that we will not exceed a burst of 10 messages, and will also not exceed a constant rate of 1 message per second.
+

Review Comment:
   I wonder if it makes sense to quickly summarize the characteristics of the strategy? It sound like this strategy really restricts the throughput to a certain rate.



##########
_posts/2022-11-25-async-sink-rate-limiting-strategy.md:
##########
@@ -0,0 +1,195 @@
+---
+layout: post
+title: "Optimising the throughput of async sinks using a custom RateLimitingStrategy"
+date: 2022-11-22T12:00:00.000Z
+authors:
+- liangtl:
+  name: "Hong Liang Teoh"
+excerpt: An overview of how to optimise the throughput of async sinks using a custom RateLimitingStrategy
+---
+
+## Introduction
+
+When designing a Flink data processing job, one of the key concerns is maximising job throughput. Sink throughput is a crucial factor because the sink throughput can determine the entire job’s throughput. We generally want the highest possible write rate in the sink without overloading the destination. However, since the factors which impact destination performance are variable over the job’s lifetime, the sink needs to adjust this write rate dynamically. Depending on the destination being written to, we might want to tune the write rate differently using a different RateLimitingStrategy.
+
+**This post explains how you can configure a custom RateLimitingStrategy on a connector that builds on the [AsyncSinkBase (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) to optimise sink throughput.** In the sections below, we first cover the design logic behind the AsyncSinkBase and the RateLimitingStrategy, then we take you through two example implementations of rate limiting strategies, specifically the CongestionControlRateLimitingStrategy and TokenBucketRateLimitingStrategy.
+
+### Background of the AsyncSinkBase
+
+When implementing the AsyncSinkBase, our main goal was to simplify building new async sinks to custom destinations by providing common functionalities used in building async sinks with at least once processing. This has allowed users to more easily write sinks to custom destinations, such as Amazon Kinesis Data Streams and Amazon Kinesis Firehose. An additional async sink to Amazon DynamoDB ([FLIP-252](https://cwiki.apache.org/confluence/display/FLINK/FLIP-252%3A+Amazon+DynamoDB+Sink+Connector)) is also being developed at the time of writing.
+
+The AsyncSinkBase offers the core implementation which handles the mechanics of async requests and responses. This includes retrying of failed messages, deciding when to flush records to the destination, and persisting un-flushed records to state during checkpointing. In order to aid throughput, the async sink also dynamically adjusts the request rate depending on the responses returned from the destination. Read more about this in our [previous 1.15 release blog post](https://flink.apache.org/2022/05/06/async-sink-base.html) or watch our [FlinkForward talk recording explaining the design of the Async Sink](https://www.youtube.com/watch?v=z-hYuLgbHuo).
+
+### Configuring the AsyncSinkBase
+
+When designing the AsyncSinkBase, we wanted users to be able to tune their custom connector implementations based on their specific use case and needs, without having to understand the low-level workings of the base sink itself.
+
+So, as part of our initial implementation in Flink 1.15, we exposed configurations such as `maxBatchSize`, `maxInFlightRequests`, `maxBufferedRequests`, `maxBatchSizeInBytes`, `maxTimeInBufferMS` and `maxRecordSizeInBytes` so that users can change the flushing and writing behaviour of the sink.
+
+In Flink 1.16, we have further extended this configurability to the RateLimitingStrategy used by the AsyncSinkBase ([FLIP-242](https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+RateLimitingStrategy+for+Async+Sink)). With this change, users can now customise how the AsyncSinkBase dynamically adjusts the request rate in real-time to optimise throughput whilst mitigating back pressure. Example customisations would include changing the mathematical function used to scale the request rate, implementing a cool off period between rate adjustments, or implementing a token bucket RateLimitingStrategy.
+
+## Rationale behind the RateLimitingStrategy interface
+
+```java
+public interface RateLimitingStrategy {
+
+   // Information provided to the RateLimitingStrategy
+    void registerInFlightRequest(RequestInfo requestInfo);
+    void registerCompletedRequest(ResultInfo resultInfo);
+    
+    // Controls offered to the RateLimitingStrategy
+    boolean shouldBlock(RequestInfo requestInfo);
+    int getMaxBatchSize();
+}
+```
+
+There are 2 core ideas behind the RateLimitingStrategy interface:
+
+* **Information methods:** We need methods to provide the RateLimitingStrategy with sufficient information to track the rate of requests being made or rate of messages being sent (each request can have more than one message)
+* **Control methods:** We also need methods to allow the RateLimitingStrategy to control the request rate of the sink.
+
+These are the type of methods that we see in the RateLimitingStrategy interface. With `registerInFlightRequest()` and `registerCompletedRequest()`, the RateLimitingStrategy has sufficient information to track the number of in-flight requests, number of in-flight messages as well as the rate of these requests.
+
+With `shouldBlock()`, the RateLimitingStrategy can decide to postpone any new requests until a specified condition is met (e.g. current in-flight requests must not exceed a given number). This allows the RateLimitingStrategy to control the rate of requests being made to the destination, in order to increase throughput, or increase backpressure in the Flink job graph.
+
+With `getMaxBatchSize()`, the RateLimitingStrategy can dynamically adjust the number of messages packaged into a single request. This might be useful to optimise sink throughput if the request size affects the performance of the destination.
+
+## Implementing a custom RateLimitingStrategy
+
+### [Example 1]  CongestionControlRateLimitingStrategy
+
+The AsyncSinkBase comes pre-packaged with the CongestionControlRateLimitingStrategy. In this section, we explore its implementation.
+
+This strategy is modelled after [TCP congestion control](https://en.wikipedia.org/wiki/TCP_congestion_control), and aims to discover the highest possible request rate for the given destination. It achieves this by increasing the request rate until the sink gets throttled by the destination, at which point it will reduce the request rate.
+
+In this RateLimitingStrategy, we want to dynamically adjust the request rate by doing the following:
+
+* Set a maximum number of in-flight requests at any time
+* Set a maximum number of in-flight messages at any time (each request can have multiple messages)
+* When there is a successful request, we increase the maximum number of in-flight messages to maximise the request rate
+* When there is an unsuccessful request, we decrease the maximum number of in-flight messages to prevent overloading the destination
+* If there are multiple sink subtasks, each can keep track of their maximum number of in-flight messages independently
+
+This strategy means what we will start with a low request rate (slow start), but aggressively increase our request rate until the destination throttles us, which allows us to discover the highest possible request rate. It will also adjust the request rate if the conditions of the destination changes (e.g. another client starts writing to the same destination). This strategy works well if the destination implements traffic shaping and throttles once the bandwidth limit is reached (e.g. Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose).
+
+First, we implement the information methods to keep track of the number of in-flight requests and in-flight messages.
+
+```java
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerInFlightRequest(RequestInfo requestInfo) {
+        currentInFlightRequests++;
+        currentInFlightMessages += requestInfo.getBatchSize();
+    }
+    
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        currentInFlightRequests = Math.max(0, currentInFlightRequests - 1);
+        currentInFlightMessages = Math.max(0, currentInFlightMessages - resultInfo.getBatchSize());
+        
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    // ...
+}
+```
+
+Then we implement the control methods to dynamically adjust the request rate.
+
+We keep a current value for maxInFlightMessages and maxInFlightRequests, and postpone all new requests if maxInFlightRequests or maxInFlightMessages have been reached.
+
+Every time a request completes, the CongestionControlRateLimitingStrategy will check if there are any failed messages in the response. If there are, it will decrease maxInFlightMessages. If there are no failed messages, it will increase maxInFlightMessages. This gives us an indirect control of rate of messages being written to the destination.
+
+Side note: The default CongestionControlRateLimitingStrategy uses an Additive Increase / Multiplicative Decrease (AIMD) scaling strategy. This is also used in TCP congestion control to avoid overloading the destination by increasing write rate slowly but backing off quickly if throttled.
+
+```java
+public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
+    // ...
+    @Override
+    public void registerCompletedRequest(ResultInfo resultInfo) {
+        // ...
+        if (resultInfo.getFailedMessages() > 0) {
+            maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
+        } else {
+            maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
+        }
+    }
+    
+    public boolean shouldBlock(RequestInfo requestInfo) {
+        return currentInFlightRequests >= maxInFlightRequests
+                || (currentInFlightMessages + requestInfo.getBatchSize() > maxInFlightMessages);
+    }
+    // ...
+}
+```
+
+### [Example 2] TokenBucketRateLimitingStrategy
+
+The CongestionControlRateLimitingStrategy is rather aggressive, and relies on a robust server-side rate limiting strategy. In the event we don’t have a robust server-side rate limiting strategy, we can implement a client-side rate limiting strategy.

Review Comment:
   Is the TokenBucketRateLimitingStrategy also part of Flink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org