You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hausmann, Steffen" <sh...@amazon.de.INVALID> on 2021/08/20 11:34:36 UTC

Re: [DISCUSS] FLIP-171: Async Sink

Hi Till,

I've updated the wiki page as per the discussion on flip-177. I hope it makes more sense now.

Cheers, Steffen

On 16.07.21, 18:28, "Till Rohrmann" <tr...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Sure, thanks for the pointers.

    Cheers,
    Till

    On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen <sh...@amazon.de.invalid>
    wrote:

    > Hi Till,
    >
    > You are right, I’ve left out some implementation details, which have
    > actually changed a couple of time as part of the ongoing discussion. You
    > can find our current prototype here [1] and a sample implementation of the
    > KPL free Kinesis sink here [2].
    >
    > I plan to update the FLIP. But I think would it be make sense to wait
    > until the implementation has stabilized enough before we update the FLIP to
    > the final state.
    >
    > Does that make sense?
    >
    > Cheers, Steffen
    >
    > [1]
    > https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink
    > [2]
    > https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
    >
    > From: Till Rohrmann <tr...@apache.org>
    > Date: Friday, 16. July 2021 at 18:10
    > To: Piotr Nowojski <pn...@apache.org>
    > Cc: Steffen Hausmann <sh...@amazon.de>, "dev@flink.apache.org" <
    > dev@flink.apache.org>, Arvid Heise <ar...@apache.org>
    > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
    >
    >
    > CAUTION: This email originated from outside of the organization. Do not
    > click links or open attachments unless you can confirm the sender and know
    > the content is safe.
    >
    >
    > Hi Steffen,
    >
    > I've taken another look at the FLIP and I stumbled across a couple of
    > inconsistencies. I think it is mainly because of the lacking code. For
    > example, it is not fully clear to me based on the current FLIP how we
    > ensure that there are no in-flight requests when
    > AsyncSinkWriter.snapshotState is called. Also the concrete implementation
    > of the AsyncSinkCommitter could be helpful for understanding how the
    > AsyncSinkWriter works in the end. Do you plan to update the FLIP
    > accordingly?
    >
    > Cheers,
    > Till
    >
    > On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski <pnowojski@apache.org
    > <ma...@apache.org>> wrote:
    > Thanks for addressing this issue :)
    >
    > Best, Piotrek
    >
    > wt., 29 cze 2021 o 17:58 Hausmann, Steffen <shausma@amazon.de<mailto:
    > shausma@amazon.de>> napisał(a):
    > Hey Poitr,
    >
    > I've just adapted the FLIP and changed the signature for the
    > `submitRequestEntries` method:
    >
    > protected abstract void submitRequestEntries(List<RequestEntryT>
    > requestEntries, ResultFuture<?> requestResult);
    >
    > In addition, we are likely to use an AtomicLong to track the number of
    > outstanding requests, as you have proposed in 2b). I've already indicated
    > this in the FLIP, but it's not fully fleshed out. But as you have said,
    > that seems to be an implementation detail and the important part is the
    > change of the `submitRequestEntries` signature.
    >
    > Thanks for your feedback!
    >
    > Cheers, Steffen
    >
    >
    > On 25.06.21, 17:05, "Hausmann, Steffen" <sh...@amazon.de.INVALID> wrote:
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >
    >     Hi Piotr,
    >
    >     I’m happy to take your guidance on this. I need to think through your
    > proposals and I’ll follow-up on Monday with some more context so that we
    > can close the discussion on these details. But for now, I’ll close the vote.
    >
    >     Thanks, Steffen
    >
    >     From: Piotr Nowojski <pnowojski@apache.org<mailto:pnowojski@apache.org
    > >>
    >     Date: Friday, 25. June 2021 at 14:48
    >     To: Till Rohrmann <tr...@apache.org>>
    >     Cc: Steffen Hausmann <sh...@amazon.de>>, "
    > dev@flink.apache.org<ma...@flink.apache.org>" <dev@flink.apache.org
    > <ma...@flink.apache.org>>, Arvid Heise <arvid@apache.org<mailto:
    > arvid@apache.org>>
    >     Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
    >
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >     Hey,
    >
    >     I've just synced with Arvid about a couple of more remarks from my
    > side and he shared mine concerns.
    >
    >     1. I would very strongly recommend ditching `CompletableFuture<?> `
    > from the  `protected abstract CompletableFuture<?>
    > submitRequestEntries(List<RequestEntryT> requestEntries);`  in favor of
    > something like
    > `org.apache.flink.streaming.api.functions.async.ResultFuture` interface.
    > `CompletableFuture<?>` would partially make the threading model of the
    > `AsyncSincWriter` part of the public API and it would tie our hands.
    > Regardless how `CompletableFuture<?>` is used, it imposes performance
    > overhead because it's synchronisation/volatile inside of it. On the other
    > hand something like:
    >
    >     protected abstract void submitRequestEntries(List<RequestEntryT>
    > requestEntries, ResultFuture<?> requestResult);
    >
    >     Would allow us to implement the threading model as we wish.
    > `ResultFuture` could be backed via `CompletableFuture<?>` underneath, but
    > it could also be something more efficient.  I will explain what I have in
    > mind in a second.
    >
    >     2. It looks to me that proposed `AsyncSinkWriter` Internals are not
    > very efficient and maybe the threading model hasn't been thought through?
    > Especially private fields:
    >
    >     private final BlockingDeque<RequestEntryT> bufferedRequestEntries;
    >     private BlockingDeque<CompletableFuture<?>> inFlightRequests;
    >
    >     are a bit strange to me. Why do we need two separate thread safe
    > collections? Why do we need a `BlockingDeque` of `CompletableFuture<?>`s?
    > If we are already using a fully synchronised collection, there should be no
    > need for another layer of thread safe `CompletableFuture<?>`.
    >
    >     As I understand, the threading model of the `AsyncSinkWriter` is very
    > similar to that of the `AsyncWaitOperator`, with very similar requirements
    > for inducing backpressure. How I would see it implemented is for example:
    >
    >     a) Having a single lock, that would encompass the whole
    > `AsyncSinkWriter#flush()` method. `flush()` would be called from the task
    > thread (mailbox). To induce backpressure, `#flush()` would just call
    > `lock.wait()`. `ResultFuture#complete(...)` called from an async thread,
    > would also synchronize on the same lock, and mark some of the inflight
    > requests as completed and call `lock.notify()`.
    >
    >     b) More efficient solution. On the hot path we would have for example
    > only `AtomicLong numberOfInFlightRequests`. Task thread would be bumping
    > it, `ResultFuture#complete()` would be decreasing it. If the task thread
    > when bumping `numberOfInFlightRequests` exceeds a threshold, he goes to
    > sleep/wait on a lock or some `CompletableFuture`. If
    > `ResultFuture#complete()` when decreasing the count goes below the
    > threshold, it would wake up the task thread.  Compared to the option a),
    > on the hot path, option b) would have only AtomicLong.increment overhead
    >
    >     c) We could use mailbox, the same way as AsyncWaitOperator is doing.
    > In this case `ResultFuture#complete()` would be enquing mailbox action,
    > which is thread safe on it's own.
    >
    >     Either of those options would be more efficient and simpler (from the
    > threading model perspective) than having two `BlockingQueues` and
    > `CompletableFuture<?>`. Also as you can see, neither of those solutions
    > require the overhead of ` CompletableFuture<?>
    > submitRequestEntries(List<RequestEntryT> requestEntries)`. Each one of
    > those could use a more efficient and custom implementation of
    > `ResultFuture.complete(...)`.
    >
    >
    >     Whether we use a), b) or c) I think should be an implementation
    > detail. But to allow this to truly be an implementation detail, we would
    > need to agree on 1. Nevertheless I think that the change I proposed in 1.
    > is small enough that I think there is no need to cancel the current vote on
    > the FLIP.
    >
    >     WDYT?
    >
    >     Piotrek
    >
    >
    >     wt., 22 cze 2021 o 11:42 Till Rohrmann <trohrmann@apache.org<mailto:
    > trohrmann@apache.org><mailto:trohrmann@apache.org<mailto:
    > trohrmann@apache.org>>> napisał(a):
    >     Adding the InterruptException to the write method would make it
    > explicit that the write call can block but must react to interruptions
    > (e.g. when Flink wants to cancel the operation). I think this makes the
    > contract a bit clearer.
    >
    >     I think starting simple and then extending the API as we see the need
    > is a good idea.
    >
    >     Cheers,
    >     Till
    >
    >     On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen <shausma@amazon.de
    > <ma...@amazon.de><mailto:shausma@amazon.de<mailto:
    > shausma@amazon.de>>> wrote:
    >     Hey,
    >
    >     Agreed on starting with a blocking `write`. I've adapted the FLIP
    > accordingly.
    >
    >     For now I've chosen to add the `InterruptedException` to the `write`
    > method signature as I'm not fully understanding the implications of
    > swallowing the exception. Depending on the details of  the code that is
    > calling the write method, it may cause event loss. But this seems more of
    > an implementation detail, that we can revisit once we are actually
    > implementing the sink.
    >
    >     Unless there are additional comments, does it make sense to start the
    > voting process in the next day or two?
    >
    >     Cheers, Steffen
    >
    >
    >     On 21.06.21, 14:51, "Piotr Nowojski" <pnowojski@apache.org<mailto:
    > pnowojski@apache.org><mailto:pnowojski@apache.org<mailto:
    > pnowojski@apache.org>>> wrote:
    >
    >         CAUTION: This email originated from outside of the organization.
    > Do not click links or open attachments unless you can confirm the sender
    > and know the content is safe.
    >
    >
    >
    >         Hi,
    >
    >         Thanks Steffen for the explanations. I think it makes sense to me.
    >
    >         Re Arvid/Steffen:
    >
    >         - Keep in mind that even if we choose to provide a non blocking
    > API using
    >         the `isAvailable()`/`getAvailableFuture()` method, we would still
    > need to
    >         support blocking inside the sinks. For example at the very least,
    > emitting
    >         many records at once (`flatMap`) or firing timers are scenarios
    > when output
    >         availability would be ignored at the moment by the runtime. Also I
    > would
    >         imagine writing very large (like 1GB) records would be blocking on
    >         something as well.
    >         - Secondly, exposing availability to the API level might not be
    > that
    >         easy/trivial. The availability pattern as defined in
    > `AvailabilityProvider`
    >         class is quite complicated and not that easy to implement by a
    > user.
    >
    >         Both of those combined with lack of a clear motivation for adding
    >         `AvailabilityProvider` to the sinks/operators/functions,  I would
    > vote on
    >         just starting with blocking `write` calls. This can always be
    > extended in
    >         the future with availability if needed/motivated properly.
    >
    >         That would be aligned with either Arvid's option 1 or 2. I don't
    > know what
    >         are the best practices with `InterruptedException`, but I'm always
    > afraid
    >         of it, so I would feel personally safer with option 2.
    >
    >         I'm not sure what problem option 3 is helping to solve? Adding
    > `wakeUp()`
    >         would sound strange to me.
    >
    >         Best,
    >         Piotrek
    >
    >         pon., 21 cze 2021 o 12:15 Arvid Heise <arvid@apache.org<mailto:
    > arvid@apache.org><ma...@apache.org>>>
    > napisał(a):
    >
    >         > Hi Piotr,
    >         >
    >         > to pick up this discussion thread again:
    >         > - This FLIP is about providing some base implementation for
    > FLIP-143 sinks
    >         > that make adding new implementations easier, similar to the
    >         > SourceReaderBase.
    >         > - The whole availability topic will most likely be a separate
    > FLIP. The
    >         > basic issue just popped up here because we currently have no way
    > to signal
    >         > backpressure in sinks except by blocking `write`. This feels
    > quite natural
    >         > in sinks with sync communication but quite unnatural in async
    > sinks.
    >         >
    >         > Now we have a couple of options. In all cases, we would have
    > some WIP
    >         > limit on the number of records/requests being able to be
    > processed in
    >         > parallel asynchronously (similar to asyncIO).
    >         > 1. We use some blocking queue in `write`, then we need to handle
    >         > interruptions. In the easiest case, we extend `write` to throw
    > the
    >         > `InterruptedException`, which is a small API change.
    >         > 2. We use a blocking queue, but handle interrupts and
    > swallow/translate
    >         > them. No API change.
    >         > Both solutions block the task thread, so any RPC message /
    > unaligned
    >         > checkpoint would be processed only after the backpressure is
    > temporarily
    >         > lifted. That's similar to the discussions that you linked.
    > Cancellation may
    >         > also be a tad harder on 2.
    >         > 3. We could also add some `wakeUp` to the `SinkWriter` similar to
    >         > `SplitFetcher` [1]. Basically, you use a normal queue with a
    > completeable
    >         > future on which you block. Wakeup would be a clean way to
    > complete it next
    >         > to the natural completion through finished requests.
    >         > 4. We add availability to the sink. However, this API change
    > also requires
    >         > that we allow operators to be available so it may be a bigger
    > change with
    >         > undesired side-effects. On the other hand, we could also use the
    > same
    >         > mechanism for asyncIO.
    >         >
    >         > For users of FLIP-171, none of the options are exposed. So we
    > could also
    >         > start with a simple solution (add `InterruptedException`) and
    > later try to
    >         > add availability. Option 1+2 would also not require an
    > additional FLIP; we
    >         > could add it as part of this FLIP.
    >         >
    >         > Best,
    >         >
    >         > Arvid
    >         >
    >         > [1]
    >         >
    > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
    >         > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
    >         > <sh...@amazon.de.invalid> wrote:
    >         >
    >         >> Hey Piotrek,
    >         >>
    >         >> Thanks for your comments on the FLIP. I'll address your second
    > question
    >         >> first, as I think it's more central to this FLIP. Just looking
    > at the AWS
    >         >> ecosystem, there are several sinks with overlapping
    > functionality. I've
    >         >> chosen AWS sinks here because I'm most familiar with those, but
    > a similar
    >         >> argument applies more generically for destination that support
    > async ingest.
    >         >>
    >         >> There is, for instance, a sink for Amazon Kinesis Data Streams
    > that is
    >         >> part of Apache Flink [1], a sink for Amazon Kinesis Data
    > Firehose [2], a
    >         >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream
    > [4]. All
    >         >> these sinks have implemented their own mechanisms for batching,
    > persisting,
    >         >> and retrying events. And I'm not sure if all of them properly
    > participate
    >         >> in checkpointing. [3] even seems to closely mirror [1] as it
    > contains
    >         >> references to the Kinesis Producer Library, which is unrelated
    > to Amazon
    >         >> DynamoDB.
    >         >>
    >         >> These sinks predate FLIP-143. But as batching, persisting, and
    > retrying
    >         >> capabilities do not seem to be part of FLIP-143, I'd argue that
    > we would
    >         >> end up with similar duplication, even if these sinks were
    > rewritten today
    >         >> based on FLIP-143. And that's the idea of FLIP-171: abstract
    > away these
    >         >> commonly required capabilities so that it becomes easy to
    > create support
    >         >> for a wide range of destination without having to think about
    > batching,
    >         >> retries, checkpointing, etc. I've included an example in the
    > FLIP [5] that
    >         >> shows that it only takes a couple of lines of code to implement
    > a sink with
    >         >> exactly-once semantics. To be fair, the example is lacking
    > robust failure
    >         >> handling and some more advanced capabilities of [1], but I
    > think it still
    >         >> supports this point.
    >         >>
    >         >> Regarding your point on the isAvailable pattern. We need some
    > way for the
    >         >> sink to propagate backpressure and we would also like to
    > support time based
    >         >> buffering hints. There are two options I currently see and
    > would need
    >         >> additional input on which one is the better or more desirable
    > one. The
    >         >> first option is to use the non-blocking isAvailable pattern.
    > Internally,
    >         >> the sink persists buffered events in the snapshot state which
    > avoids having
    >         >> to flush buffered record on a checkpoint. This seems to align
    > well with the
    >         >> non-blocking isAvailable pattern. The second option is to make
    > calls to
    >         >> `write` blocking and leverage an internal thread to trigger
    > flushes based
    >         >> on time based buffering hints. We've discussed these options
    > with Arvid and
    >         >> suggested to assumed that the `isAvailable` pattern will become
    > available
    >         >> for sinks through and additional FLIP.
    >         >>
    >         >> I think it is an important discussion to have. My understanding
    > of the
    >         >> implications for Flink in general are very naïve, so I'd be
    > happy to get
    >         >> further guidance. However, I don't want to make this discussion
    > part of
    >         >> FLIP-171. For FLIP-171 we'll use whatever is available.
    >         >>
    >         >> Does that make sense?
    >         >>
    >         >> Cheers, Steffen
    >         >>
    >         >>
    >         >> [1]
    >         >>
    > https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
    >         >> [2]
    > https://github.com/aws/aws-kinesisanalytics-flink-connectors
    >         >> [3]
    > https://github.com/klarna-incubator/flink-connector-dynamodb
    >         >> [4] https://github.com/awslabs/amazon-timestream-tools/
    >         >> [5]
    >         >>
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
    >         >>
    >         >>
    >         >> On 09.06.21, 19:44, "Piotr Nowojski" <pnowojski@apache.org
    > <ma...@apache.org><mailto:pnowojski@apache.org<mailto:
    > pnowojski@apache.org>>> wrote:
    >         >>
    >         >>     CAUTION: This email originated from outside of the
    > organization. Do
    >         >> not click links or open attachments unless you can confirm the
    > sender and
    >         >> know the content is safe.
    >         >>
    >         >>
    >         >>
    >         >>     Hi Steffen,
    >         >>
    >         >>     Thanks for writing down the proposal. Back when the new
    > Sink API was
    >         >> being
    >         >>     discussed, I was proposing to add our usual
    > `CompletableFuture<Void>
    >         >>     isAvailable()` pattern to make sinks non-blocking. You can
    > see the
    >         >>     discussion starting here [1], and continuing for a couple
    > of more
    >         >> posts
    >         >>     until here [2]. Back then, the outcome was that it would
    > give very
    >         >> little
    >         >>     benefit, at the expense of making the API more complicated.
    > Could you
    >         >> maybe
    >         >>     relate your proposal to that discussion from last year?
    >         >>
    >         >>     I see that your proposal is going much further than just
    > adding the
    >         >>     availability method, could you also motivate this a bit
    > further?
    >         >> Could you
    >         >>     maybe reference/show some sinks that:
    >         >>     1. are already implemented using FLIP-143
    >         >>     2. that have some code duplication...
    >         >>     3. ...this duplication would be solved by FLIP-171
    >         >>
    >         >>     Best,
    >         >>     Piotrek
    >         >>
    >         >>     [1]
    >         >>
    >         >>
    > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
    >         >>     [2]
    >         >>
    >         >>
    > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
    >         >>
    >         >>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen
    > <sh...@amazon.de.invalid>
    >         >>     napisał(a):
    >         >>
    >         >>     > Hi there,
    >         >>     >
    >         >>     > We would like to start a discussion thread on "FLIP-171:
    > Async
    >         >> Sink" [1],
    >         >>     > where we propose to create a common abstraction for
    > destinations
    >         >> that
    >         >>     > support async requests. This abstraction will make it
    > easier to add
    >         >>     > destinations to Flink by implementing a lightweight shim,
    > while it
    >         >> avoids
    >         >>     > maintaining dozens of independent sinks.
    >         >>     >
    >         >>     > Looking forward to your feedback.
    >         >>     >
    >         >>     > Cheers, Steffen
    >         >>     >
    >         >>     > [1]
    >         >>     >
    >         >>
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >         >>     >
    >         >>     >
    >         >>     >
    >         >>     > Amazon Web Services EMEA SARL
    >         >>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
    >         >>     > Sitz der Gesellschaft: L-1855 Luxemburg
    >         >>     > eingetragen im Luxemburgischen Handelsregister unter
    > R.C.S. B186284
    >         >>     >
    >         >>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >         >>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
    >         >>     > Sitz der Zweigniederlassung: Muenchen
    >         >>     > eingetragen im Handelsregister des Amtsgerichts Muenchen
    > unter HRB
    >         >> 242240,
    >         >>     > USt-ID DE317013094
    >         >>     >
    >         >>     >
    >         >>     >
    >         >>     >
    >         >>
    >         >>
    >         >>
    >         >>
    >         >> Amazon Web Services EMEA SARL
    >         >> 38 avenue John F. Kennedy, L-1855 Luxembourg
    >         >> Sitz der Gesellschaft: L-1855 Luxemburg
    >         >> eingetragen im Luxemburgischen Handelsregister unter R.C.S.
    > B186284
    >         >>
    >         >> Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >         >> Marcel-Breuer-Str. 12, D-80807 Muenchen
    >         >> Sitz der Zweigniederlassung: Muenchen
    >         >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter
    > HRB
    >         >> 242240, USt-ID DE317013094
    >         >>
    >         >>
    >         >>
    >         >>
    >
    >
    >
    >
    >     Amazon Web Services EMEA SARL
    >     38 avenue John F. Kennedy, L-1855 Luxembourg
    >     Sitz der Gesellschaft: L-1855 Luxemburg
    >     eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    >     Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >     Marcel-Breuer-Str. 12, D-80807 Muenchen
    >     Sitz der Zweigniederlassung: Muenchen
    >     eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
    > 242240, USt-ID DE317013094
    >
    >
    >
    >
    >
    >     Amazon Web Services EMEA SARL
    >     38 avenue John F. Kennedy, L-1855 Luxembourg
    >     Sitz der Gesellschaft: L-1855 Luxemburg
    >     eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    >     Amazon Web Services EMEA SARL, Niederlassung Deutschland
    >     Marcel-Breuer-Str. 12, D-80807 Muenchen
    >     Sitz der Zweigniederlassung: Muenchen
    >     eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
    > 242240, USt-ID DE317013094
    >
    >
    >
    >
    >
    >
    >
    > Amazon Web Services EMEA SARL
    > 38 avenue John F. Kennedy, L-1855 Luxembourg
    > Sitz der Gesellschaft: L-1855 Luxemburg
    > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    > Marcel-Breuer-Str. 12, D-80807 Muenchen
    > Sitz der Zweigniederlassung: Muenchen
    > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
    > USt-ID DE317013094
    >
    >
    >
    >
    >
    > Amazon Web Services EMEA SARL
    > 38 avenue John F. Kennedy, L-1855 Luxembourg
    > Sitz der Gesellschaft: L-1855 Luxemburg
    > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    > Marcel-Breuer-Str. 12, D-80807 Muenchen
    > Sitz der Zweigniederlassung: Muenchen
    > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
    > USt-ID DE317013094
    >
    >
    >
    >




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094




Re: [DISCUSS] FLIP-171: Async Sink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the update Steffen. I'll try to take a look at it asap.

Cheers,
Till

On Fri, Aug 20, 2021 at 1:34 PM Hausmann, Steffen <sh...@amazon.de> wrote:

> Hi Till,
>
> I've updated the wiki page as per the discussion on flip-177. I hope it
> makes more sense now.
>
> Cheers, Steffen
>
> On 16.07.21, 18:28, "Till Rohrmann" <tr...@apache.org> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Sure, thanks for the pointers.
>
>     Cheers,
>     Till
>
>     On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen
> <sh...@amazon.de.invalid>
>     wrote:
>
>     > Hi Till,
>     >
>     > You are right, I’ve left out some implementation details, which have
>     > actually changed a couple of time as part of the ongoing discussion.
> You
>     > can find our current prototype here [1] and a sample implementation
> of the
>     > KPL free Kinesis sink here [2].
>     >
>     > I plan to update the FLIP. But I think would it be make sense to wait
>     > until the implementation has stabilized enough before we update the
> FLIP to
>     > the final state.
>     >
>     > Does that make sense?
>     >
>     > Cheers, Steffen
>     >
>     > [1]
>     >
> https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink
>     > [2]
>     >
> https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>     >
>     > From: Till Rohrmann <tr...@apache.org>
>     > Date: Friday, 16. July 2021 at 18:10
>     > To: Piotr Nowojski <pn...@apache.org>
>     > Cc: Steffen Hausmann <sh...@amazon.de>, "dev@flink.apache.org" <
>     > dev@flink.apache.org>, Arvid Heise <ar...@apache.org>
>     > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>     >
>     >
>     > CAUTION: This email originated from outside of the organization. Do
> not
>     > click links or open attachments unless you can confirm the sender
> and know
>     > the content is safe.
>     >
>     >
>     > Hi Steffen,
>     >
>     > I've taken another look at the FLIP and I stumbled across a couple of
>     > inconsistencies. I think it is mainly because of the lacking code.
> For
>     > example, it is not fully clear to me based on the current FLIP how we
>     > ensure that there are no in-flight requests when
>     > AsyncSinkWriter.snapshotState is called. Also the concrete
> implementation
>     > of the AsyncSinkCommitter could be helpful for understanding how the
>     > AsyncSinkWriter works in the end. Do you plan to update the FLIP
>     > accordingly?
>     >
>     > Cheers,
>     > Till
>     >
>     > On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski <pnowojski@apache.org
>     > <ma...@apache.org>> wrote:
>     > Thanks for addressing this issue :)
>     >
>     > Best, Piotrek
>     >
>     > wt., 29 cze 2021 o 17:58 Hausmann, Steffen <shausma@amazon.de
> <mailto:
>     > shausma@amazon.de>> napisał(a):
>     > Hey Poitr,
>     >
>     > I've just adapted the FLIP and changed the signature for the
>     > `submitRequestEntries` method:
>     >
>     > protected abstract void submitRequestEntries(List<RequestEntryT>
>     > requestEntries, ResultFuture<?> requestResult);
>     >
>     > In addition, we are likely to use an AtomicLong to track the number
> of
>     > outstanding requests, as you have proposed in 2b). I've already
> indicated
>     > this in the FLIP, but it's not fully fleshed out. But as you have
> said,
>     > that seems to be an implementation detail and the important part is
> the
>     > change of the `submitRequestEntries` signature.
>     >
>     > Thanks for your feedback!
>     >
>     > Cheers, Steffen
>     >
>     >
>     > On 25.06.21, 17:05, "Hausmann, Steffen" <sh...@amazon.de.INVALID>
> wrote:
>     >
>     >     CAUTION: This email originated from outside of the organization.
> Do
>     > not click links or open attachments unless you can confirm the
> sender and
>     > know the content is safe.
>     >
>     >
>     >
>     >     Hi Piotr,
>     >
>     >     I’m happy to take your guidance on this. I need to think through
> your
>     > proposals and I’ll follow-up on Monday with some more context so
> that we
>     > can close the discussion on these details. But for now, I’ll close
> the vote.
>     >
>     >     Thanks, Steffen
>     >
>     >     From: Piotr Nowojski <pnowojski@apache.org<mailto:
> pnowojski@apache.org
>     > >>
>     >     Date: Friday, 25. June 2021 at 14:48
>     >     To: Till Rohrmann <trohrmann@apache.org<mailto:
> trohrmann@apache.org>>
>     >     Cc: Steffen Hausmann <sh...@amazon.de>>,
> "
>     > dev@flink.apache.org<ma...@flink.apache.org>" <
> dev@flink.apache.org
>     > <ma...@flink.apache.org>>, Arvid Heise <arvid@apache.org
> <mailto:
>     > arvid@apache.org>>
>     >     Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>     >
>     >
>     >     CAUTION: This email originated from outside of the organization.
> Do
>     > not click links or open attachments unless you can confirm the
> sender and
>     > know the content is safe.
>     >
>     >
>     >     Hey,
>     >
>     >     I've just synced with Arvid about a couple of more remarks from
> my
>     > side and he shared mine concerns.
>     >
>     >     1. I would very strongly recommend ditching
> `CompletableFuture<?> `
>     > from the  `protected abstract CompletableFuture<?>
>     > submitRequestEntries(List<RequestEntryT> requestEntries);`  in favor
> of
>     > something like
>     > `org.apache.flink.streaming.api.functions.async.ResultFuture`
> interface.
>     > `CompletableFuture<?>` would partially make the threading model of
> the
>     > `AsyncSincWriter` part of the public API and it would tie our hands.
>     > Regardless how `CompletableFuture<?>` is used, it imposes performance
>     > overhead because it's synchronisation/volatile inside of it. On the
> other
>     > hand something like:
>     >
>     >     protected abstract void submitRequestEntries(List<RequestEntryT>
>     > requestEntries, ResultFuture<?> requestResult);
>     >
>     >     Would allow us to implement the threading model as we wish.
>     > `ResultFuture` could be backed via `CompletableFuture<?>`
> underneath, but
>     > it could also be something more efficient.  I will explain what I
> have in
>     > mind in a second.
>     >
>     >     2. It looks to me that proposed `AsyncSinkWriter` Internals are
> not
>     > very efficient and maybe the threading model hasn't been thought
> through?
>     > Especially private fields:
>     >
>     >     private final BlockingDeque<RequestEntryT>
> bufferedRequestEntries;
>     >     private BlockingDeque<CompletableFuture<?>> inFlightRequests;
>     >
>     >     are a bit strange to me. Why do we need two separate thread safe
>     > collections? Why do we need a `BlockingDeque` of
> `CompletableFuture<?>`s?
>     > If we are already using a fully synchronised collection, there
> should be no
>     > need for another layer of thread safe `CompletableFuture<?>`.
>     >
>     >     As I understand, the threading model of the `AsyncSinkWriter` is
> very
>     > similar to that of the `AsyncWaitOperator`, with very similar
> requirements
>     > for inducing backpressure. How I would see it implemented is for
> example:
>     >
>     >     a) Having a single lock, that would encompass the whole
>     > `AsyncSinkWriter#flush()` method. `flush()` would be called from the
> task
>     > thread (mailbox). To induce backpressure, `#flush()` would just call
>     > `lock.wait()`. `ResultFuture#complete(...)` called from an async
> thread,
>     > would also synchronize on the same lock, and mark some of the
> inflight
>     > requests as completed and call `lock.notify()`.
>     >
>     >     b) More efficient solution. On the hot path we would have for
> example
>     > only `AtomicLong numberOfInFlightRequests`. Task thread would be
> bumping
>     > it, `ResultFuture#complete()` would be decreasing it. If the task
> thread
>     > when bumping `numberOfInFlightRequests` exceeds a threshold, he goes
> to
>     > sleep/wait on a lock or some `CompletableFuture`. If
>     > `ResultFuture#complete()` when decreasing the count goes below the
>     > threshold, it would wake up the task thread.  Compared to the option
> a),
>     > on the hot path, option b) would have only AtomicLong.increment
> overhead
>     >
>     >     c) We could use mailbox, the same way as AsyncWaitOperator is
> doing.
>     > In this case `ResultFuture#complete()` would be enquing mailbox
> action,
>     > which is thread safe on it's own.
>     >
>     >     Either of those options would be more efficient and simpler
> (from the
>     > threading model perspective) than having two `BlockingQueues` and
>     > `CompletableFuture<?>`. Also as you can see, neither of those
> solutions
>     > require the overhead of ` CompletableFuture<?>
>     > submitRequestEntries(List<RequestEntryT> requestEntries)`. Each one
> of
>     > those could use a more efficient and custom implementation of
>     > `ResultFuture.complete(...)`.
>     >
>     >
>     >     Whether we use a), b) or c) I think should be an implementation
>     > detail. But to allow this to truly be an implementation detail, we
> would
>     > need to agree on 1. Nevertheless I think that the change I proposed
> in 1.
>     > is small enough that I think there is no need to cancel the current
> vote on
>     > the FLIP.
>     >
>     >     WDYT?
>     >
>     >     Piotrek
>     >
>     >
>     >     wt., 22 cze 2021 o 11:42 Till Rohrmann <trohrmann@apache.org
> <mailto:
>     > trohrmann@apache.org><mailto:trohrmann@apache.org<mailto:
>     > trohrmann@apache.org>>> napisał(a):
>     >     Adding the InterruptException to the write method would make it
>     > explicit that the write call can block but must react to
> interruptions
>     > (e.g. when Flink wants to cancel the operation). I think this makes
> the
>     > contract a bit clearer.
>     >
>     >     I think starting simple and then extending the API as we see the
> need
>     > is a good idea.
>     >
>     >     Cheers,
>     >     Till
>     >
>     >     On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen <
> shausma@amazon.de
>     > <ma...@amazon.de><mailto:shausma@amazon.de<mailto:
>     > shausma@amazon.de>>> wrote:
>     >     Hey,
>     >
>     >     Agreed on starting with a blocking `write`. I've adapted the FLIP
>     > accordingly.
>     >
>     >     For now I've chosen to add the `InterruptedException` to the
> `write`
>     > method signature as I'm not fully understanding the implications of
>     > swallowing the exception. Depending on the details of  the code that
> is
>     > calling the write method, it may cause event loss. But this seems
> more of
>     > an implementation detail, that we can revisit once we are actually
>     > implementing the sink.
>     >
>     >     Unless there are additional comments, does it make sense to
> start the
>     > voting process in the next day or two?
>     >
>     >     Cheers, Steffen
>     >
>     >
>     >     On 21.06.21, 14:51, "Piotr Nowojski" <pnowojski@apache.org
> <mailto:
>     > pnowojski@apache.org><mailto:pnowojski@apache.org<mailto:
>     > pnowojski@apache.org>>> wrote:
>     >
>     >         CAUTION: This email originated from outside of the
> organization.
>     > Do not click links or open attachments unless you can confirm the
> sender
>     > and know the content is safe.
>     >
>     >
>     >
>     >         Hi,
>     >
>     >         Thanks Steffen for the explanations. I think it makes sense
> to me.
>     >
>     >         Re Arvid/Steffen:
>     >
>     >         - Keep in mind that even if we choose to provide a non
> blocking
>     > API using
>     >         the `isAvailable()`/`getAvailableFuture()` method, we would
> still
>     > need to
>     >         support blocking inside the sinks. For example at the very
> least,
>     > emitting
>     >         many records at once (`flatMap`) or firing timers are
> scenarios
>     > when output
>     >         availability would be ignored at the moment by the runtime.
> Also I
>     > would
>     >         imagine writing very large (like 1GB) records would be
> blocking on
>     >         something as well.
>     >         - Secondly, exposing availability to the API level might not
> be
>     > that
>     >         easy/trivial. The availability pattern as defined in
>     > `AvailabilityProvider`
>     >         class is quite complicated and not that easy to implement by
> a
>     > user.
>     >
>     >         Both of those combined with lack of a clear motivation for
> adding
>     >         `AvailabilityProvider` to the sinks/operators/functions,  I
> would
>     > vote on
>     >         just starting with blocking `write` calls. This can always be
>     > extended in
>     >         the future with availability if needed/motivated properly.
>     >
>     >         That would be aligned with either Arvid's option 1 or 2. I
> don't
>     > know what
>     >         are the best practices with `InterruptedException`, but I'm
> always
>     > afraid
>     >         of it, so I would feel personally safer with option 2.
>     >
>     >         I'm not sure what problem option 3 is helping to solve?
> Adding
>     > `wakeUp()`
>     >         would sound strange to me.
>     >
>     >         Best,
>     >         Piotrek
>     >
>     >         pon., 21 cze 2021 o 12:15 Arvid Heise <arvid@apache.org
> <mailto:
>     > arvid@apache.org><ma...@apache.org>>>
>     > napisał(a):
>     >
>     >         > Hi Piotr,
>     >         >
>     >         > to pick up this discussion thread again:
>     >         > - This FLIP is about providing some base implementation for
>     > FLIP-143 sinks
>     >         > that make adding new implementations easier, similar to the
>     >         > SourceReaderBase.
>     >         > - The whole availability topic will most likely be a
> separate
>     > FLIP. The
>     >         > basic issue just popped up here because we currently have
> no way
>     > to signal
>     >         > backpressure in sinks except by blocking `write`. This
> feels
>     > quite natural
>     >         > in sinks with sync communication but quite unnatural in
> async
>     > sinks.
>     >         >
>     >         > Now we have a couple of options. In all cases, we would
> have
>     > some WIP
>     >         > limit on the number of records/requests being able to be
>     > processed in
>     >         > parallel asynchronously (similar to asyncIO).
>     >         > 1. We use some blocking queue in `write`, then we need to
> handle
>     >         > interruptions. In the easiest case, we extend `write` to
> throw
>     > the
>     >         > `InterruptedException`, which is a small API change.
>     >         > 2. We use a blocking queue, but handle interrupts and
>     > swallow/translate
>     >         > them. No API change.
>     >         > Both solutions block the task thread, so any RPC message /
>     > unaligned
>     >         > checkpoint would be processed only after the backpressure
> is
>     > temporarily
>     >         > lifted. That's similar to the discussions that you linked.
>     > Cancellation may
>     >         > also be a tad harder on 2.
>     >         > 3. We could also add some `wakeUp` to the `SinkWriter`
> similar to
>     >         > `SplitFetcher` [1]. Basically, you use a normal queue with
> a
>     > completeable
>     >         > future on which you block. Wakeup would be a clean way to
>     > complete it next
>     >         > to the natural completion through finished requests.
>     >         > 4. We add availability to the sink. However, this API
> change
>     > also requires
>     >         > that we allow operators to be available so it may be a
> bigger
>     > change with
>     >         > undesired side-effects. On the other hand, we could also
> use the
>     > same
>     >         > mechanism for asyncIO.
>     >         >
>     >         > For users of FLIP-171, none of the options are exposed. So
> we
>     > could also
>     >         > start with a simple solution (add `InterruptedException`)
> and
>     > later try to
>     >         > add availability. Option 1+2 would also not require an
>     > additional FLIP; we
>     >         > could add it as part of this FLIP.
>     >         >
>     >         > Best,
>     >         >
>     >         > Arvid
>     >         >
>     >         > [1]
>     >         >
>     >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
>     >         > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
>     >         > <sh...@amazon.de.invalid> wrote:
>     >         >
>     >         >> Hey Piotrek,
>     >         >>
>     >         >> Thanks for your comments on the FLIP. I'll address your
> second
>     > question
>     >         >> first, as I think it's more central to this FLIP. Just
> looking
>     > at the AWS
>     >         >> ecosystem, there are several sinks with overlapping
>     > functionality. I've
>     >         >> chosen AWS sinks here because I'm most familiar with
> those, but
>     > a similar
>     >         >> argument applies more generically for destination that
> support
>     > async ingest.
>     >         >>
>     >         >> There is, for instance, a sink for Amazon Kinesis Data
> Streams
>     > that is
>     >         >> part of Apache Flink [1], a sink for Amazon Kinesis Data
>     > Firehose [2], a
>     >         >> sink for Amazon DynamoDB [3], and a sink for Amazon
> Timestream
>     > [4]. All
>     >         >> these sinks have implemented their own mechanisms for
> batching,
>     > persisting,
>     >         >> and retrying events. And I'm not sure if all of them
> properly
>     > participate
>     >         >> in checkpointing. [3] even seems to closely mirror [1] as
> it
>     > contains
>     >         >> references to the Kinesis Producer Library, which is
> unrelated
>     > to Amazon
>     >         >> DynamoDB.
>     >         >>
>     >         >> These sinks predate FLIP-143. But as batching,
> persisting, and
>     > retrying
>     >         >> capabilities do not seem to be part of FLIP-143, I'd
> argue that
>     > we would
>     >         >> end up with similar duplication, even if these sinks were
>     > rewritten today
>     >         >> based on FLIP-143. And that's the idea of FLIP-171:
> abstract
>     > away these
>     >         >> commonly required capabilities so that it becomes easy to
>     > create support
>     >         >> for a wide range of destination without having to think
> about
>     > batching,
>     >         >> retries, checkpointing, etc. I've included an example in
> the
>     > FLIP [5] that
>     >         >> shows that it only takes a couple of lines of code to
> implement
>     > a sink with
>     >         >> exactly-once semantics. To be fair, the example is lacking
>     > robust failure
>     >         >> handling and some more advanced capabilities of [1], but I
>     > think it still
>     >         >> supports this point.
>     >         >>
>     >         >> Regarding your point on the isAvailable pattern. We need
> some
>     > way for the
>     >         >> sink to propagate backpressure and we would also like to
>     > support time based
>     >         >> buffering hints. There are two options I currently see and
>     > would need
>     >         >> additional input on which one is the better or more
> desirable
>     > one. The
>     >         >> first option is to use the non-blocking isAvailable
> pattern.
>     > Internally,
>     >         >> the sink persists buffered events in the snapshot state
> which
>     > avoids having
>     >         >> to flush buffered record on a checkpoint. This seems to
> align
>     > well with the
>     >         >> non-blocking isAvailable pattern. The second option is to
> make
>     > calls to
>     >         >> `write` blocking and leverage an internal thread to
> trigger
>     > flushes based
>     >         >> on time based buffering hints. We've discussed these
> options
>     > with Arvid and
>     >         >> suggested to assumed that the `isAvailable` pattern will
> become
>     > available
>     >         >> for sinks through and additional FLIP.
>     >         >>
>     >         >> I think it is an important discussion to have. My
> understanding
>     > of the
>     >         >> implications for Flink in general are very naïve, so I'd
> be
>     > happy to get
>     >         >> further guidance. However, I don't want to make this
> discussion
>     > part of
>     >         >> FLIP-171. For FLIP-171 we'll use whatever is available.
>     >         >>
>     >         >> Does that make sense?
>     >         >>
>     >         >> Cheers, Steffen
>     >         >>
>     >         >>
>     >         >> [1]
>     >         >>
>     >
> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
>     >         >> [2]
>     > https://github.com/aws/aws-kinesisanalytics-flink-connectors
>     >         >> [3]
>     > https://github.com/klarna-incubator/flink-connector-dynamodb
>     >         >> [4] https://github.com/awslabs/amazon-timestream-tools/
>     >         >> [5]
>     >         >>
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
>     >         >>
>     >         >>
>     >         >> On 09.06.21, 19:44, "Piotr Nowojski" <
> pnowojski@apache.org
>     > <ma...@apache.org><mailto:pnowojski@apache.org<mailto:
>     > pnowojski@apache.org>>> wrote:
>     >         >>
>     >         >>     CAUTION: This email originated from outside of the
>     > organization. Do
>     >         >> not click links or open attachments unless you can
> confirm the
>     > sender and
>     >         >> know the content is safe.
>     >         >>
>     >         >>
>     >         >>
>     >         >>     Hi Steffen,
>     >         >>
>     >         >>     Thanks for writing down the proposal. Back when the
> new
>     > Sink API was
>     >         >> being
>     >         >>     discussed, I was proposing to add our usual
>     > `CompletableFuture<Void>
>     >         >>     isAvailable()` pattern to make sinks non-blocking.
> You can
>     > see the
>     >         >>     discussion starting here [1], and continuing for a
> couple
>     > of more
>     >         >> posts
>     >         >>     until here [2]. Back then, the outcome was that it
> would
>     > give very
>     >         >> little
>     >         >>     benefit, at the expense of making the API more
> complicated.
>     > Could you
>     >         >> maybe
>     >         >>     relate your proposal to that discussion from last
> year?
>     >         >>
>     >         >>     I see that your proposal is going much further than
> just
>     > adding the
>     >         >>     availability method, could you also motivate this a
> bit
>     > further?
>     >         >> Could you
>     >         >>     maybe reference/show some sinks that:
>     >         >>     1. are already implemented using FLIP-143
>     >         >>     2. that have some code duplication...
>     >         >>     3. ...this duplication would be solved by FLIP-171
>     >         >>
>     >         >>     Best,
>     >         >>     Piotrek
>     >         >>
>     >         >>     [1]
>     >         >>
>     >         >>
>     >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
>     >         >>     [2]
>     >         >>
>     >         >>
>     >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
>     >         >>
>     >         >>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen
>     > <sh...@amazon.de.invalid>
>     >         >>     napisał(a):
>     >         >>
>     >         >>     > Hi there,
>     >         >>     >
>     >         >>     > We would like to start a discussion thread on
> "FLIP-171:
>     > Async
>     >         >> Sink" [1],
>     >         >>     > where we propose to create a common abstraction for
>     > destinations
>     >         >> that
>     >         >>     > support async requests. This abstraction will make
> it
>     > easier to add
>     >         >>     > destinations to Flink by implementing a lightweight
> shim,
>     > while it
>     >         >> avoids
>     >         >>     > maintaining dozens of independent sinks.
>     >         >>     >
>     >         >>     > Looking forward to your feedback.
>     >         >>     >
>     >         >>     > Cheers, Steffen
>     >         >>     >
>     >         >>     > [1]
>     >         >>     >
>     >         >>
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>     >         >>     >
>     >         >>     >
>     >         >>     >
>     >         >>     > Amazon Web Services EMEA SARL
>     >         >>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>     >         >>     > Sitz der Gesellschaft: L-1855 Luxemburg
>     >         >>     > eingetragen im Luxemburgischen Handelsregister unter
>     > R.C.S. B186284
>     >         >>     >
>     >         >>     > Amazon Web Services EMEA SARL, Niederlassung
> Deutschland
>     >         >>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >         >>     > Sitz der Zweigniederlassung: Muenchen
>     >         >>     > eingetragen im Handelsregister des Amtsgerichts
> Muenchen
>     > unter HRB
>     >         >> 242240,
>     >         >>     > USt-ID DE317013094
>     >         >>     >
>     >         >>     >
>     >         >>     >
>     >         >>     >
>     >         >>
>     >         >>
>     >         >>
>     >         >>
>     >         >> Amazon Web Services EMEA SARL
>     >         >> 38 avenue John F. Kennedy, L-1855 Luxembourg
>     >         >> Sitz der Gesellschaft: L-1855 Luxemburg
>     >         >> eingetragen im Luxemburgischen Handelsregister unter
> R.C.S.
>     > B186284
>     >         >>
>     >         >> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     >         >> Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >         >> Sitz der Zweigniederlassung: Muenchen
>     >         >> eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter
>     > HRB
>     >         >> 242240, USt-ID DE317013094
>     >         >>
>     >         >>
>     >         >>
>     >         >>
>     >
>     >
>     >
>     >
>     >     Amazon Web Services EMEA SARL
>     >     38 avenue John F. Kennedy, L-1855 Luxembourg
>     >     Sitz der Gesellschaft: L-1855 Luxemburg
>     >     eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>     >
>     >     Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     >     Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >     Sitz der Zweigniederlassung: Muenchen
>     >     eingetragen im Handelsregister des Amtsgerichts Muenchen unter
> HRB
>     > 242240, USt-ID DE317013094
>     >
>     >
>     >
>     >
>     >
>     >     Amazon Web Services EMEA SARL
>     >     38 avenue John F. Kennedy, L-1855 Luxembourg
>     >     Sitz der Gesellschaft: L-1855 Luxemburg
>     >     eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>     >
>     >     Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     >     Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >     Sitz der Zweigniederlassung: Muenchen
>     >     eingetragen im Handelsregister des Amtsgerichts Muenchen unter
> HRB
>     > 242240, USt-ID DE317013094
>     >
>     >
>     >
>     >
>     >
>     >
>     >
>     > Amazon Web Services EMEA SARL
>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>     > Sitz der Gesellschaft: L-1855 Luxemburg
>     > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>     >
>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>     > Sitz der Zweigniederlassung: Muenchen
>     > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240,
>     > USt-ID DE317013094
>     >
>     >
>     >
>     >
>     >
>     > Amazon Web Services EMEA SARL
>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>     > Sitz der Gesellschaft: L-1855 Luxemburg
>     > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>     >
>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>     > Sitz der Zweigniederlassung: Muenchen
>     > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240,
>     > USt-ID DE317013094
>     >
>     >
>     >
>     >
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>