You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ning (Jira)" <ji...@apache.org> on 2022/01/11 00:10:00 UTC

[jira] [Updated] (BEAM-13627) aws sqs I/O misses to drop expired messages, output mutation exception

     [ https://issues.apache.org/jira/browse/BEAM-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ning updated BEAM-13627:
------------------------
    Summary: aws sqs I/O misses to drop expired messages, output mutation exception  (was: aws sqs I/O misses to delete some acked messages, output mutation exception)

> aws sqs I/O misses to drop expired messages, output mutation exception
> ----------------------------------------------------------------------
>
>                 Key: BEAM-13627
>                 URL: https://issues.apache.org/jira/browse/BEAM-13627
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>    Affects Versions: 2.34.0
>            Reporter: Ning
>            Priority: P2
>
> The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, the [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584) logic **filters** messages **to delete** based on the **inflight** state. However, there are assumptions in the [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756) logic where the inflight state is modified to **exclude** messages that are **assumed expired or to be expired**. These messages are **not** explicitly requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the I/O could be [processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509) a message that should have been expired to wait for it to be resent).
> Filed https://issues.apache.org/jira/browse/BEAM-13627.
> Though I'm not sure if pulling the same message again with a new receipt handle within the same bundle would cause the problem of mutation detection because receipt handle is part of the Message hashcode unless there is a hash collision in the mutation detector.
> **TL;DR: debugging process**
> The mutation was detected in the SqsUnboundedSource, not caused by any other code in the pipeline.
> The code that reports the warning and throws the exception is [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
> The only field changed is the Receipt handle. It's documented [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html) that:
> > If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).
> There is no [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447) change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the culprit.
> There is a significant change between Beam 2.31.0 and Beam 2.34.0 for [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
> To receive a message more than once, the message must not have been deleted since the first time received. The deletion logic is invoked in [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)