You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexey Romanenko (Jira)" <ji...@apache.org> on 2021/12/24 12:37:00 UTC

[jira] [Commented] (BEAM-13203) Potential data loss when using SnsIO.writeAsync

    [ https://issues.apache.org/jira/browse/BEAM-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465007#comment-17465007 ] 

Alexey Romanenko commented on BEAM-13203:
-----------------------------------------

I'm sorry for "ignoring" this Jira for a while, but I was off for a while and it's just lost somewhere in my inbox.

[~mosche] Could you elaborate what do you mean by "_SnsIO.writeAsync is conceptually so broken_" except that one can have  potentially a data loss in case of unexpected pipeline fail/stop and what should be done to fix/improve this? 

> Potential data loss when using SnsIO.writeAsync
> -----------------------------------------------
>
>                 Key: BEAM-13203
>                 URL: https://issues.apache.org/jira/browse/BEAM-13203
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Priority: P1
>
> This needs to be investigated, reading the code suggests we might be losing data under certain conditions e.g. when terminating the pipeline. The async processing model here is far too simplistic.
> The bundle won't ever know about pending writes and won't block to wait for any such operation. The same way exceptions are thrown into nowhere. Test cases don't capture this as they operate on completed futures only (so exceptions in the callbacks get thrown on the thread of processElement).
> {code:java}
> client.publish(publishRequest).whenComplete((response, ex) -> {
>   if (ex == null) {
>     SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
>     context.output(snsResponse);
>   } else {
>     LOG.error("Error while publishing request to SNS", ex);
>     throw new SnsWriteException("Error while publishing request to SNS", ex);
>   }
> }); {code}
> Also, this entirely removes backpressure from a stream. When used with a much faster source we will continue to accumulate more and more memory as the number of concurrent pending async operations is not limited.
> Spotify's scio contains a [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java] that illustrates how it can be done.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)