You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/04 17:12:22 UTC

[GitHub] [flink-web] zentol commented on a diff in pull request #517: [FLINK-24370] Addition of blog post describing features and usage of the Async Sink…

zentol commented on code in PR #517:
URL: https://github.com/apache/flink-web/pull/517#discussion_r865039865


##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)

Review Comment:
   Link to the release-1.15.0 tag instead



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);

Review Comment:
   This goes beyond the blogpost but the naming of the requestResult parameter is questionable.
   
   It should be named like "requestToRetry" or something along those lines.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.

Review Comment:
   ```suggestion
   In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously.
   ```
   This reads quite weird imo.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.

Review Comment:
   I'm curious why this was implemented as a Consumer and not just as a "reportFatalException(Exception)" method.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.
+
+# Sink Behavior
+
+There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.
+
+* `int maxBatchSize` - maximum number of elements that may be passed in the   list to submitRequestEntries to be written downstream.
+* `int maxInFlightRequests` - maximum number of uncompleted calls to submitRequestEntries that the SinkWriter will allow at any given point. Once this point has reached, writes and callbacks to add elements to the buffer may block until one or more requests to submitRequestEntries completes.
+* `int maxBufferedRequests` - maximum buffer length. Callbacks to add elements to the buffer and calls to write will block if this length has been reached and will only unblock if elements from the buffer have been removed for flushing.
+* `long maxBatchSizeInBytes` - a flush will be attempted if the most recent call to write introduces an element to the buffer such that the total size of the buffer is greater than or equal to this threshold value.
+* `long maxTimeInBufferMS` - maximum amount of time an element may remain in the buffer. In most cases elements are flushed as a result of the batch size (in bytes or number) being reached or during a snapshot. However, there are scenarios where an element may remain in the buffer forever or a long period of time. To mitigate this, a timer is constantly active in the buffer such that: while the buffer is not empty, it will flush every maxTimeInBufferMS milliseconds.
+* `long maxRecordSizeInBytes` - maximum size in bytes allowed for a single record, as determined by `getSizeInBytes()`.

Review Comment:
   What happens if this is exceeded`?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.

Review Comment:
   the metric name start with a lower-case letter. The metric type doesn't seem relevant to the implementation.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.

Review Comment:
   Moreover it is extra strange that there are no metrics that capture failures etc.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.

Review Comment:
   It's also strange that we don't differentiate between successful and failed writes. for numRecords-/BytesOut.
   That _will_ throw users off.



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>

Review Comment:
   side-note: Why do we require RequestEntryT to be serializable?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.

Review Comment:
   How would such a strategy look like? Do we expect users to embed some ID into RequestEntryT? Does the base provide any utilities to implement such a strategy? Are there examples anywhere?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.

Review Comment:
   ```suggestion
   By default, the method `snapshotState` returns all the elements in the buffer to be saved for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
   ```



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.
+
+# Sink Behavior
+
+There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.
+
+* `int maxBatchSize` - maximum number of elements that may be passed in the   list to submitRequestEntries to be written downstream.

Review Comment:
   ```suggestion
   * `int maxBatchSize` - maximum number of elements that may be passed in the list to submitRequestEntries to be written downstream.
   ```



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.

Review Comment:
   ```suggestion
   To store additional state from the concrete sink you can override `snapshotState`, and restore the state it in the constructor.
   ```



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.
+
+# Sink Behavior
+
+There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.
+
+* `int maxBatchSize` - maximum number of elements that may be passed in the   list to submitRequestEntries to be written downstream.
+* `int maxInFlightRequests` - maximum number of uncompleted calls to submitRequestEntries that the SinkWriter will allow at any given point. Once this point has reached, writes and callbacks to add elements to the buffer may block until one or more requests to submitRequestEntries completes.
+* `int maxBufferedRequests` - maximum buffer length. Callbacks to add elements to the buffer and calls to write will block if this length has been reached and will only unblock if elements from the buffer have been removed for flushing.
+* `long maxBatchSizeInBytes` - a flush will be attempted if the most recent call to write introduces an element to the buffer such that the total size of the buffer is greater than or equal to this threshold value.
+* `long maxTimeInBufferMS` - maximum amount of time an element may remain in the buffer. In most cases elements are flushed as a result of the batch size (in bytes or number) being reached or during a snapshot. However, there are scenarios where an element may remain in the buffer forever or a long period of time. To mitigate this, a timer is constantly active in the buffer such that: while the buffer is not empty, it will flush every maxTimeInBufferMS milliseconds.
+* `long maxRecordSizeInBytes` - maximum size in bytes allowed for a single record, as determined by `getSizeInBytes()`.
+
+Destinations typically have a defined throughput limit and will begin throttling or rejecting requests once near. With multiple subtasks, we employ [Additive Increase Multiplicative Decrease (AIMD)](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) as a strategy for selecting the optimal batch size.
+
+# Summary
+
+The AsyncSinkBase is a new abstraction that makes maintaining and creating async sinks easier.  This will be available in Flink 1.15 and we hope that you will try it out and give us feedback on it.

Review Comment:
   ```suggestion
   The AsyncSinkBase is a new abstraction that makes maintaining and creating async sinks easier. This will be available in Flink 1.15 and we hope that you will try it out and give us feedback on it.
   ```



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.

Review Comment:
   What if the sink can't gauge the actual size?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.

Review Comment:
   This is worded in a very roundabout way.
   ```suggestion
   AsyncSinkBase implementations return their own extension of the `AsyncSinkWriter` from `createWriter()`.
   ```



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.
+
+# Sink Behavior
+
+There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.

Review Comment:
   Why is this not documented in the javadocs?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  

Review Comment:
   "return" is throwing me off a bit. Is it actually the time that it took for submitRequestEntries() to _return_, or for `requestResult#accept` to be called?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.

Review Comment:
   How can an element be "in flight" and "complete" at the same time?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.

Review Comment:
   It's not clear how the restoration via the constructor works. Where does the sink (which calls the constructor) get access to the state that should be restored?



##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies.
+
+The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows users to specify a byte size threshold beyond which elements will be flushed. However the sink implementer is best positioned to determine what is the most sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer to be saved down for snapshots. Any elements that have been removed from the buffer for writing and are in flight have already been completed due to the sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of the `AsyncSinkWriter` from `createWriter()` inside their own implementation of `AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams) and [Kinesis Data Firehose sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose) are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks (and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took for the most recent request to write records to return, whether successful or not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to write to the destination, using the method `getSizeInBytes` to determine the size of each record. This will double count failures that may need to be retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of records the sink has tried to write to the destination. This will double count failures that may need to be retried.
+
+# Sink Behavior
+
+There are six sink configuration settings that control the buffering, flushing, and retry behavior of the sink.
+
+* `int maxBatchSize` - maximum number of elements that may be passed in the   list to submitRequestEntries to be written downstream.
+* `int maxInFlightRequests` - maximum number of uncompleted calls to submitRequestEntries that the SinkWriter will allow at any given point. Once this point has reached, writes and callbacks to add elements to the buffer may block until one or more requests to submitRequestEntries completes.
+* `int maxBufferedRequests` - maximum buffer length. Callbacks to add elements to the buffer and calls to write will block if this length has been reached and will only unblock if elements from the buffer have been removed for flushing.
+* `long maxBatchSizeInBytes` - a flush will be attempted if the most recent call to write introduces an element to the buffer such that the total size of the buffer is greater than or equal to this threshold value.
+* `long maxTimeInBufferMS` - maximum amount of time an element may remain in the buffer. In most cases elements are flushed as a result of the batch size (in bytes or number) being reached or during a snapshot. However, there are scenarios where an element may remain in the buffer forever or a long period of time. To mitigate this, a timer is constantly active in the buffer such that: while the buffer is not empty, it will flush every maxTimeInBufferMS milliseconds.

Review Comment:
   I'm curious why this was modeled as along instead of a Duration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org