You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Moritz Mack (Jira)" <ji...@apache.org> on 2022/01/11 10:29:00 UTC

[jira] [Commented] (BEAM-13627) aws sqs I/O misses to drop expired messages, transform output mutation exception

    [ https://issues.apache.org/jira/browse/BEAM-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472638#comment-17472638 ] 

Moritz Mack commented on BEAM-13627:
------------------------------------

[~ningk] advancing to the next message is (and must be) separate from acking (aka deleting) a message. In the meanwhile a receipt handle might just expire no matter how hard we try to avoid this. Duplicate messages (at least once delivery) are fairly common in such distributed systems and in certainly not unexpected.

Though, I agree that the notion of assumed expired messages should be revisited. I don't know why it was implemented like that historically. It might just be the fact that extending or deleting a message using an expired receipt handle fails. Though such messages should just be ignored on retries, see this PR [https://github.com/apache/beam/pull/16478.] With that in place, receipt handles can be used until they are known to be expired (rather expiring them prematurely). 

> aws sqs I/O misses to drop expired messages, transform output mutation exception
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-13627
>                 URL: https://issues.apache.org/jira/browse/BEAM-13627
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>    Affects Versions: 2.34.0
>            Reporter: Ning
>            Priority: P2
>
> The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, the [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584) logic **filters** messages **to delete** based on the **inflight** state. However, there are assumptions in the [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756) logic where the inflight state is modified to **exclude** messages that are **assumed expired or to be expired**. These messages are **not** explicitly requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the I/O could be [processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509) a message that should have been expired to wait for it to be resent).
> The ideal behavior should be not advancing a message if its receipt handle is expired, skip it and wait for it to be resent.
> Though I'm not sure, with the current wrong behavior: advancing a message that is not "acked" (not deleted and will be resent), if pulling the same message again with a new receipt handle within the same bundle would cause the problem of mutation detection because receipt handle is part of the Message hashcode unless there is a hash collision in the mutation detector.
> **TL;DR: debugging process**
> The mutation was detected in the SqsUnboundedSource, not caused by any other code in the pipeline.
> The code that reports the warning and throws the exception is [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
> The only field changed is the Receipt handle. It's documented [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html) that:
> > If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).
> There is no [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447) change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the culprit.
> There is a significant change between Beam 2.31.0 and Beam 2.34.0 for [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
> To receive a message more than once, the message must not have been deleted since the first time received. The deletion logic is invoked in [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)