You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ning (Jira)" <ji...@apache.org> on 2022/01/10 23:19:00 UTC

[jira] [Created] (BEAM-13627) aws sqs I/O misses to delete some acked messages, output mutation exception

Ning created BEAM-13627:
---------------------------

             Summary: aws sqs I/O misses to delete some acked messages, output mutation exception
                 Key: BEAM-13627
                 URL: https://issues.apache.org/jira/browse/BEAM-13627
             Project: Beam
          Issue Type: Bug
          Components: io-java-aws
    Affects Versions: 2.34.0
            Reporter: Ning


Original problem and analysis: https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception

The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, the [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584) logic **filters** messages **to delete** based on the **inflight** state. However, there are assumptions in the [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756) logic where the inflight state is modified to **exclude** messages that are **assumed expired or to be expired**. These messages are **never** explicitly deleted from sqs.

So in a future pull, sqs could resend messages that are read from [messagesNotYetRead](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509). And [safeToDeleteIds](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L526) contains those excluded message ids but never get to delete them.

This will be detected as output mutation exception.

**TL;DR: debugging process**

The mutation was detected in the SqsUnboundedSource, not caused by any other code in the pipeline.

The code that reports the warning and throws the exception is [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).

The only field changed is the Receipt handle. It's documented [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html) that:

> If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).

There is no [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447) change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the culprit.

There is a significant change between Beam 2.31.0 and Beam 2.34.0 for [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).

To receive a message more than once, the message must not have been deleted since the first time received. The deletion logic is invoked in [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).






--
This message was sent by Atlassian Jira
(v8.20.1#820001)