You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexey Romanenko (Jira)" <ji...@apache.org> on 2022/01/14 09:23:00 UTC

[jira] [Work started] (BEAM-13631) SQS read IO requires deterministic coder for SQS message to work in batch mode mode.

     [ https://issues.apache.org/jira/browse/BEAM-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on BEAM-13631 started by Moritz Mack.
------------------------------------------
> SQS read IO requires deterministic coder for SQS message to work in batch mode mode.
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-13631
>                 URL: https://issues.apache.org/jira/browse/BEAM-13631
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>              Labels: SQS, aws-sdk-v1
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently the SQS read IO uses SerializableCoder.of(Message.class), which isn't deterministic. This may cause issues when used in batch mode (based on BoundedReadFromUnboundedSource). The mutation detector will throw in such case:
> {code:java}
> Jan 10, 2022 11:37:05 AM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions
> WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
> Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
> Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} after it was output (new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}). Values must not be mutated in any way after being output.
>     at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>     at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
>     at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
>     at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>     at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>     at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO.
>     at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>     at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>     at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>     at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>     ... 10 more
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO2Mw.
>  {code}
> https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception



--
This message was sent by Atlassian Jira
(v8.20.1#820001)