You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yuri Jin <yu...@unity3d.com> on 2022/05/06 03:52:34 UTC

IllegalMutationException in PTransform

Hi Beam users,

We have a DoFn that reads data from Kafka and parses an array byte payload.
It works fine with dataflow runner, but throws IllegalMutationException
with direct runner. It does not directly modify the input value. Therefore,
I am guessing that the output is different when there are multiple input
values.

The detailed error is as follows.

Exception in thread "main"
org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
mutated value "DoFnOutputA" after it was output (new value was
"DoFnOutputB"). Values must not be mutated in any way after being output.
        at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
        at
org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
        at
org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
        at
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
        at
org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
        at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
"DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
"Base64EncodedA", now "Base64EncodedB".
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
        at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
        at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
        ... 10 more

* "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
different from "Base64EncodedB".

I was wondering if you could give me some advice on the following questions.

1. How can we find the problematic part? I did some unit tests, but I
couldn't reproduce them.
2. Have you experienced the same error and solved it?
3. Only Direct runner enforces immutability for DoFns. Is it safe to use
the "enforceImmutability=false" option?


Any comments would be appreciated.

Thanks,
Yuri Jin

Re: IllegalMutationException in PTransform

Posted by Yuri Jin <yu...@unity3d.com>.
Hi Reuven.

I have overridden and implemented equals() and got the same exception.

With more testing, I suspect the part of parsing the byte array into a http
request inside inputAdaptor.adapt() .
We are using com.athaydes.rawhttp.rawhttp-core.
https://github.com/renatoathaydes/rawhttp/blob/core-2.0/rawhttp-core/src/main/java/rawhttp/core/RawHttp.java#L130

I'm thinking of implementing it in a different way. Any advice on that?

Thank you,
Yuri Jin

On Fri, May 6, 2022 at 2:49 PM Siyu Lin <si...@unity3d.com> wrote:

> Hi Reuven,
>
> Do you mean we should have coder explicitly defined for all input types
> and output types in chaining do fns? Do we also need to have comparedTo and
> equals defined as well?
>
> thanks again!
> Siyu
>
> On May 6, 2022, at 12:23 PM, Reuven Lax <re...@google.com> wrote:
>
> 
> Could be - I would check the implementation of inputAdapator.
>
> On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yu...@unity3d.com> wrote:
>
>> Thanks, I'll check it out.
>>
>> I split inputAdaptor.adapt() into different DoFn for testing and it threw
>> the same exception for the new DoFn. So I guess it's because of
>> inputAdaptor.adapt().
>>
>> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I meant to say .equals() not compareTo.
>>>
>>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Unfortunately I'm not very familiar with Scio. However this could also
>>>> be caused by an object that either doesn't properly implement the compareTo
>>>> method or the coder doesn't return such an object in structuralValue.
>>>>
>>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:
>>>>
>>>>> Reuven, thanks for the reply.
>>>>>
>>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>>>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>>>>> ByteArrayCoder.of)" coder.
>>>>> I can't paste the code for DoFn due to company policy, but here's the
>>>>> structure:
>>>>>
>>>>> //////////////////////////////////////
>>>>> Pipeline.scala
>>>>> -------------------------------
>>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>>>
>>>>> //////////////////////////////////////
>>>>> ParsePayloadDoFn.scala
>>>>> -------------------------------
>>>>> class ParsePayloadDoFn[InputType](
>>>>>   inputAdaptor: RowAdaptor[InputType],
>>>>>   ...
>>>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>>>>> DoFn[InputType, OutpuType] {
>>>>>
>>>>>   @Setup
>>>>>   def setup(): Unit =
>>>>>     inputAdaptor.setup()
>>>>>
>>>>>   @ProcessElement
>>>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext):
>>>>> Unit =
>>>>>     Try {
>>>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>>>>> payload
>>>>>       ...
>>>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>>>>> c.output(deadLetterTag, _)
>>>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>>>
>>>>>       result.protocol match {
>>>>>         case error: ErrorType =>
>>>>>           deadLetterMessageFn(KV.of(..., ...))
>>>>>         case payload: Payload =>
>>>>>           payload.events.zipWithIndex.foreach {
>>>>>             case failure: ParsingFailure =>
>>>>>               deadLetterMessageFn(KV.of(..., ...))
>>>>>             case (message: Message, index: Int) =>
>>>>>               // extract body from Message
>>>>>               val body = ...
>>>>>               // make a GET http call and compose output
>>>>>               val output = ...
>>>>>
>>>>>               outputPayloadFn(
>>>>>                 OutputType(
>>>>>                   output,
>>>>>                   ...
>>>>>                   payload.header,
>>>>>                   body,
>>>>>                   index
>>>>>                 )
>>>>>               )
>>>>>           }
>>>>>       }
>>>>>     } match {
>>>>>       case Failure(exception) =>
>>>>>         error(
>>>>>           s"ParsePayloadDoFn - unhandled exception:
>>>>> ${exception.getMessage}\nStack trace:
>>>>> ${ExceptionUtils.getStackTrace(exception)}"
>>>>>         )
>>>>>       case Success(_) => ()
>>>>>     }
>>>>> }
>>>>> //////////////////////////////////////
>>>>>
>>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>>>
>>>>> Thank you,
>>>>> Yuri Jin
>>>>>
>>>>>
>>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> What is the type of the input - do you have a custom coder? Are you
>>>>>> able to paste the code for your DoFn?
>>>>>>
>>>>>> In answer to your question - Direct runner tests for this, because it
>>>>>> is a testing runner. This error scenario can cause random unexpected
>>>>>> behavior in production runners, which is why the testing runner tries to
>>>>>> explicitly detect it.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>>>>>
>>>>>>> Hi Beam users,
>>>>>>>
>>>>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>>>>> payload. It works fine with dataflow runner, but throws
>>>>>>> IllegalMutationException with direct runner. It does not directly modify
>>>>>>> the input value. Therefore, I am guessing that the output is different when
>>>>>>> there are multiple input values.
>>>>>>>
>>>>>>> The detailed error is as follows.
>>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>>>>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>>>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>>>         at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>>>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>>>         at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>>>         at
>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>>>         ... 10 more
>>>>>>>
>>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA"
>>>>>>> is different from "Base64EncodedB".
>>>>>>>
>>>>>>> I was wondering if you could give me some advice on the following
>>>>>>> questions.
>>>>>>>
>>>>>>> 1. How can we find the problematic part? I did some unit tests, but
>>>>>>> I couldn't reproduce them.
>>>>>>> 2. Have you experienced the same error and solved it?
>>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>>>>> use the "enforceImmutability=false" option?
>>>>>>>
>>>>>>>
>>>>>>> Any comments would be appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Yuri Jin
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Yuri Jin
>>>>> Senior Software Developer, Data Platform
>>>>> yuri.jin@unity3d.com
>>>>> (+1) 778-858-3585 <(778)%20858-3585>
>>>>> unity.com
>>>>>
>>>>
>>
>> --
>> Yuri Jin
>> Senior Software Developer, Data Platform
>> yuri.jin@unity3d.com
>> (+1) 778-858-3585 <(778)%20858-3585>
>> unity.com
>>
>

-- 
Yuri Jin
Senior Software Developer, Data Platform
yuri.jin@unity3d.com
(+1) 778-858-3585
unity.com

Re: IllegalMutationException in PTransform

Posted by Siyu Lin <si...@unity3d.com>.
Hi Reuven,

Do you mean we should have coder explicitly defined for all input types and output types in chaining do fns? Do we also need to have comparedTo and equals defined as well?

thanks again!
Siyu

> On May 6, 2022, at 12:23 PM, Reuven Lax <re...@google.com> wrote:
> 
> 
> Could be - I would check the implementation of inputAdapator.
> 
>> On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yu...@unity3d.com> wrote:
>> Thanks, I'll check it out.
>> 
>> I split inputAdaptor.adapt() into different DoFn for testing and it threw the same exception for the new DoFn. So I guess it's because of inputAdaptor.adapt().
>> 
>>> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote:
>>> I meant to say .equals() not compareTo.
>>> 
>>>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>>>> Unfortunately I'm not very familiar with Scio. However this could also be caused by an object that either doesn't properly implement the compareTo method or the coder doesn't return such an object in structuralValue.
>>>> 
>>>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:
>>>>> Reuven, thanks for the reply.
>>>>> 
>>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), ByteArrayCoder.of)" coder.
>>>>> I can't paste the code for DoFn due to company policy, but here's the structure:
>>>>> 
>>>>> //////////////////////////////////////
>>>>> Pipeline.scala
>>>>> -------------------------------
>>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>>> 
>>>>> //////////////////////////////////////
>>>>> ParsePayloadDoFn.scala
>>>>> -------------------------------
>>>>> class ParsePayloadDoFn[InputType](
>>>>>   inputAdaptor: RowAdaptor[InputType],
>>>>>   ...
>>>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType, OutpuType] {
>>>>> 
>>>>>   @Setup
>>>>>   def setup(): Unit =
>>>>>     inputAdaptor.setup()
>>>>> 
>>>>>   @ProcessElement
>>>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit =
>>>>>     Try {
>>>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse payload
>>>>>       ...
>>>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = c.output(deadLetterTag, _)
>>>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>>> 
>>>>>       result.protocol match {
>>>>>         case error: ErrorType =>
>>>>>           deadLetterMessageFn(KV.of(..., ...))
>>>>>         case payload: Payload =>
>>>>>           payload.events.zipWithIndex.foreach {
>>>>>             case failure: ParsingFailure =>
>>>>>               deadLetterMessageFn(KV.of(..., ...))
>>>>>             case (message: Message, index: Int) =>
>>>>>               // extract body from Message
>>>>>               val body = ...
>>>>>               // make a GET http call and compose output
>>>>>               val output = ...
>>>>>               
>>>>>               outputPayloadFn(
>>>>>                 OutputType(
>>>>>                   output,
>>>>>                   ...
>>>>>                   payload.header,
>>>>>                   body,
>>>>>                   index
>>>>>                 )
>>>>>               )
>>>>>           }
>>>>>       }
>>>>>     } match {
>>>>>       case Failure(exception) =>
>>>>>         error(
>>>>>           s"ParsePayloadDoFn - unhandled exception: ${exception.getMessage}\nStack trace: ${ExceptionUtils.getStackTrace(exception)}"
>>>>>         )
>>>>>       case Success(_) => ()
>>>>>     }
>>>>> }
>>>>> //////////////////////////////////////
>>>>> 
>>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>>> 
>>>>> Thank you,
>>>>> Yuri Jin
>>>>> 
>>>>> 
>>>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>>>>> What is the type of the input - do you have a custom coder? Are you able to paste the code for your DoFn?
>>>>>> 
>>>>>> In answer to your question - Direct runner tests for this, because it is a testing runner. This error scenario can cause random unexpected behavior in production runners, which is why the testing runner tries to explicitly detect it.
>>>>>> 
>>>>>> Reuven
>>>>>> 
>>>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>>>>>> Hi Beam users,
>>>>>>> 
>>>>>>> We have a DoFn that reads data from Kafka and parses an array byte payload. It works fine with dataflow runner, but throws IllegalMutationException with direct runner. It does not directly modify the input value. Therefore, I am guessing that the output is different when there are multiple input values.
>>>>>>> 
>>>>>>> The detailed error is as follows.
>>>>>>> Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload mutated value "DoFnOutputA" after it was output (new value was "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>>>>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>>>         at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>>>         at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>>>         at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>>>         at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>>>         at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was "Base64EncodedA", now "Base64EncodedB".
>>>>>>>         at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>>>         at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>>>         at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>>>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>>>         ... 10 more
>>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is different from "Base64EncodedB".
>>>>>>> 
>>>>>>> I was wondering if you could give me some advice on the following questions.
>>>>>>> 1. How can we find the problematic part? I did some unit tests, but I couldn't reproduce them.
>>>>>>> 2. Have you experienced the same error and solved it?
>>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use the "enforceImmutability=false" option?
>>>>>>> 
>>>>>>> Any comments would be appreciated.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Yuri Jin
>>>>> 
>>>>> 
>>>>> -- 
>>>>> 
>>>>> Yuri Jin
>>>>> Senior Software Developer, Data Platform
>>>>> yuri.jin@unity3d.com
>>>>> (+1) 778-858-3585
>>>>> unity.com
>> 
>> 
>> -- 
>> 
>> Yuri Jin
>> Senior Software Developer, Data Platform
>> yuri.jin@unity3d.com
>> (+1) 778-858-3585
>> unity.com

Re: IllegalMutationException in PTransform

Posted by Reuven Lax <re...@google.com>.
Could be - I would check the implementation of inputAdapator.

On Fri, May 6, 2022 at 11:59 AM Yuri Jin <yu...@unity3d.com> wrote:

> Thanks, I'll check it out.
>
> I split inputAdaptor.adapt() into different DoFn for testing and it threw
> the same exception for the new DoFn. So I guess it's because of
> inputAdaptor.adapt().
>
> On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote:
>
>> I meant to say .equals() not compareTo.
>>
>> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Unfortunately I'm not very familiar with Scio. However this could also
>>> be caused by an object that either doesn't properly implement the compareTo
>>> method or the coder doesn't return such an object in structuralValue.
>>>
>>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:
>>>
>>>> Reuven, thanks for the reply.
>>>>
>>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>>>> ByteArrayCoder.of)" coder.
>>>> I can't paste the code for DoFn due to company policy, but here's the
>>>> structure:
>>>>
>>>> //////////////////////////////////////
>>>> Pipeline.scala
>>>> -------------------------------
>>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>>
>>>> //////////////////////////////////////
>>>> ParsePayloadDoFn.scala
>>>> -------------------------------
>>>> class ParsePayloadDoFn[InputType](
>>>>   inputAdaptor: RowAdaptor[InputType],
>>>>   ...
>>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>>>> DoFn[InputType, OutpuType] {
>>>>
>>>>   @Setup
>>>>   def setup(): Unit =
>>>>     inputAdaptor.setup()
>>>>
>>>>   @ProcessElement
>>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext):
>>>> Unit =
>>>>     Try {
>>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>>>> payload
>>>>       ...
>>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>>>> c.output(deadLetterTag, _)
>>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>>
>>>>       result.protocol match {
>>>>         case error: ErrorType =>
>>>>           deadLetterMessageFn(KV.of(..., ...))
>>>>         case payload: Payload =>
>>>>           payload.events.zipWithIndex.foreach {
>>>>             case failure: ParsingFailure =>
>>>>               deadLetterMessageFn(KV.of(..., ...))
>>>>             case (message: Message, index: Int) =>
>>>>               // extract body from Message
>>>>               val body = ...
>>>>               // make a GET http call and compose output
>>>>               val output = ...
>>>>
>>>>               outputPayloadFn(
>>>>                 OutputType(
>>>>                   output,
>>>>                   ...
>>>>                   payload.header,
>>>>                   body,
>>>>                   index
>>>>                 )
>>>>               )
>>>>           }
>>>>       }
>>>>     } match {
>>>>       case Failure(exception) =>
>>>>         error(
>>>>           s"ParsePayloadDoFn - unhandled exception:
>>>> ${exception.getMessage}\nStack trace:
>>>> ${ExceptionUtils.getStackTrace(exception)}"
>>>>         )
>>>>       case Success(_) => ()
>>>>     }
>>>> }
>>>> //////////////////////////////////////
>>>>
>>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>>
>>>> Thank you,
>>>> Yuri Jin
>>>>
>>>>
>>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> What is the type of the input - do you have a custom coder? Are you
>>>>> able to paste the code for your DoFn?
>>>>>
>>>>> In answer to your question - Direct runner tests for this, because it
>>>>> is a testing runner. This error scenario can cause random unexpected
>>>>> behavior in production runners, which is why the testing runner tries to
>>>>> explicitly detect it.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>>>>
>>>>>> Hi Beam users,
>>>>>>
>>>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>>>> payload. It works fine with dataflow runner, but throws
>>>>>> IllegalMutationException with direct runner. It does not directly modify
>>>>>> the input value. Therefore, I am guessing that the output is different when
>>>>>> there are multiple input values.
>>>>>>
>>>>>> The detailed error is as follows.
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>>>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>>>         at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>>         at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>>         at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>>         at
>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>>         ... 10 more
>>>>>>
>>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA"
>>>>>> is different from "Base64EncodedB".
>>>>>>
>>>>>> I was wondering if you could give me some advice on the following
>>>>>> questions.
>>>>>>
>>>>>> 1. How can we find the problematic part? I did some unit tests, but I
>>>>>> couldn't reproduce them.
>>>>>> 2. Have you experienced the same error and solved it?
>>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>>>> use the "enforceImmutability=false" option?
>>>>>>
>>>>>>
>>>>>> Any comments would be appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>> Yuri Jin
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Yuri Jin
>>>> Senior Software Developer, Data Platform
>>>> yuri.jin@unity3d.com
>>>> (+1) 778-858-3585 <(778)%20858-3585>
>>>> unity.com
>>>>
>>>
>
> --
> Yuri Jin
> Senior Software Developer, Data Platform
> yuri.jin@unity3d.com
> (+1) 778-858-3585 <(778)%20858-3585>
> unity.com
>

Re: IllegalMutationException in PTransform

Posted by Yuri Jin <yu...@unity3d.com>.
Thanks, I'll check it out.

I split inputAdaptor.adapt() into different DoFn for testing and it threw
the same exception for the new DoFn. So I guess it's because of
inputAdaptor.adapt().

On Fri, May 6, 2022 at 11:45 AM Reuven Lax <re...@google.com> wrote:

> I meant to say .equals() not compareTo.
>
> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:
>
>> Unfortunately I'm not very familiar with Scio. However this could also be
>> caused by an object that either doesn't properly implement the compareTo
>> method or the coder doesn't return such an object in structuralValue.
>>
>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:
>>
>>> Reuven, thanks for the reply.
>>>
>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>>> ByteArrayCoder.of)" coder.
>>> I can't paste the code for DoFn due to company policy, but here's the
>>> structure:
>>>
>>> //////////////////////////////////////
>>> Pipeline.scala
>>> -------------------------------
>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>
>>> //////////////////////////////////////
>>> ParsePayloadDoFn.scala
>>> -------------------------------
>>> class ParsePayloadDoFn[InputType](
>>>   inputAdaptor: RowAdaptor[InputType],
>>>   ...
>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>>> DoFn[InputType, OutpuType] {
>>>
>>>   @Setup
>>>   def setup(): Unit =
>>>     inputAdaptor.setup()
>>>
>>>   @ProcessElement
>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext):
>>> Unit =
>>>     Try {
>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>>> payload
>>>       ...
>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>>> c.output(deadLetterTag, _)
>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>
>>>       result.protocol match {
>>>         case error: ErrorType =>
>>>           deadLetterMessageFn(KV.of(..., ...))
>>>         case payload: Payload =>
>>>           payload.events.zipWithIndex.foreach {
>>>             case failure: ParsingFailure =>
>>>               deadLetterMessageFn(KV.of(..., ...))
>>>             case (message: Message, index: Int) =>
>>>               // extract body from Message
>>>               val body = ...
>>>               // make a GET http call and compose output
>>>               val output = ...
>>>
>>>               outputPayloadFn(
>>>                 OutputType(
>>>                   output,
>>>                   ...
>>>                   payload.header,
>>>                   body,
>>>                   index
>>>                 )
>>>               )
>>>           }
>>>       }
>>>     } match {
>>>       case Failure(exception) =>
>>>         error(
>>>           s"ParsePayloadDoFn - unhandled exception:
>>> ${exception.getMessage}\nStack trace:
>>> ${ExceptionUtils.getStackTrace(exception)}"
>>>         )
>>>       case Success(_) => ()
>>>     }
>>> }
>>> //////////////////////////////////////
>>>
>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>
>>> Thank you,
>>> Yuri Jin
>>>
>>>
>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> What is the type of the input - do you have a custom coder? Are you
>>>> able to paste the code for your DoFn?
>>>>
>>>> In answer to your question - Direct runner tests for this, because it
>>>> is a testing runner. This error scenario can cause random unexpected
>>>> behavior in production runners, which is why the testing runner tries to
>>>> explicitly detect it.
>>>>
>>>> Reuven
>>>>
>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>>>
>>>>> Hi Beam users,
>>>>>
>>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>>> payload. It works fine with dataflow runner, but throws
>>>>> IllegalMutationException with direct runner. It does not directly modify
>>>>> the input value. Therefore, I am guessing that the output is different when
>>>>> there are multiple input values.
>>>>>
>>>>> The detailed error is as follows.
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>>         at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>         at
>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>         at
>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>         at
>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>         at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>         ... 10 more
>>>>>
>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA"
>>>>> is different from "Base64EncodedB".
>>>>>
>>>>> I was wondering if you could give me some advice on the following
>>>>> questions.
>>>>>
>>>>> 1. How can we find the problematic part? I did some unit tests, but I
>>>>> couldn't reproduce them.
>>>>> 2. Have you experienced the same error and solved it?
>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>>> use the "enforceImmutability=false" option?
>>>>>
>>>>>
>>>>> Any comments would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Yuri Jin
>>>>>
>>>>
>>>
>>> --
>>> Yuri Jin
>>> Senior Software Developer, Data Platform
>>> yuri.jin@unity3d.com
>>> (+1) 778-858-3585 <(778)%20858-3585>
>>> unity.com
>>>
>>

-- 
Yuri Jin
Senior Software Developer, Data Platform
yuri.jin@unity3d.com
(+1) 778-858-3585
unity.com

Re: IllegalMutationException in PTransform

Posted by Reuven Lax <re...@google.com>.
I meant to say .equals() not compareTo.

On Fri, May 6, 2022 at 11:44 AM Reuven Lax <re...@google.com> wrote:

> Unfortunately I'm not very familiar with Scio. However this could also be
> caused by an object that either doesn't properly implement the compareTo
> method or the coder doesn't return such an object in structuralValue.
>
> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:
>
>> Reuven, thanks for the reply.
>>
>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>> ByteArrayCoder.of)" coder.
>> I can't paste the code for DoFn due to company policy, but here's the
>> structure:
>>
>> //////////////////////////////////////
>> Pipeline.scala
>> -------------------------------
>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>
>> //////////////////////////////////////
>> ParsePayloadDoFn.scala
>> -------------------------------
>> class ParsePayloadDoFn[InputType](
>>   inputAdaptor: RowAdaptor[InputType],
>>   ...
>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>> DoFn[InputType, OutpuType] {
>>
>>   @Setup
>>   def setup(): Unit =
>>     inputAdaptor.setup()
>>
>>   @ProcessElement
>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit
>> =
>>     Try {
>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>> payload
>>       ...
>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>> c.output(deadLetterTag, _)
>>       val outputPayloadFn: OutputType => Unit = c.output
>>
>>       result.protocol match {
>>         case error: ErrorType =>
>>           deadLetterMessageFn(KV.of(..., ...))
>>         case payload: Payload =>
>>           payload.events.zipWithIndex.foreach {
>>             case failure: ParsingFailure =>
>>               deadLetterMessageFn(KV.of(..., ...))
>>             case (message: Message, index: Int) =>
>>               // extract body from Message
>>               val body = ...
>>               // make a GET http call and compose output
>>               val output = ...
>>
>>               outputPayloadFn(
>>                 OutputType(
>>                   output,
>>                   ...
>>                   payload.header,
>>                   body,
>>                   index
>>                 )
>>               )
>>           }
>>       }
>>     } match {
>>       case Failure(exception) =>
>>         error(
>>           s"ParsePayloadDoFn - unhandled exception:
>> ${exception.getMessage}\nStack trace:
>> ${ExceptionUtils.getStackTrace(exception)}"
>>         )
>>       case Success(_) => ()
>>     }
>> }
>> //////////////////////////////////////
>>
>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>
>> Thank you,
>> Yuri Jin
>>
>>
>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>>
>>> What is the type of the input - do you have a custom coder? Are you able
>>> to paste the code for your DoFn?
>>>
>>> In answer to your question - Direct runner tests for this, because it is
>>> a testing runner. This error scenario can cause random unexpected behavior
>>> in production runners, which is why the testing runner tries to explicitly
>>> detect it.
>>>
>>> Reuven
>>>
>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>>
>>>> Hi Beam users,
>>>>
>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>> payload. It works fine with dataflow runner, but throws
>>>> IllegalMutationException with direct runner. It does not directly modify
>>>> the input value. Therefore, I am guessing that the output is different when
>>>> there are multiple input values.
>>>>
>>>> The detailed error is as follows.
>>>>
>>>> Exception in thread "main"
>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>         at
>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>         at
>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>         at
>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>         at
>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>         at
>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>         at
>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>         at
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>         at
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>         at
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>         at
>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>         ... 10 more
>>>>
>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
>>>> different from "Base64EncodedB".
>>>>
>>>> I was wondering if you could give me some advice on the following
>>>> questions.
>>>>
>>>> 1. How can we find the problematic part? I did some unit tests, but I
>>>> couldn't reproduce them.
>>>> 2. Have you experienced the same error and solved it?
>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>> use the "enforceImmutability=false" option?
>>>>
>>>>
>>>> Any comments would be appreciated.
>>>>
>>>> Thanks,
>>>> Yuri Jin
>>>>
>>>
>>
>> --
>> Yuri Jin
>> Senior Software Developer, Data Platform
>> yuri.jin@unity3d.com
>> (+1) 778-858-3585 <(778)%20858-3585>
>> unity.com
>>
>

Re: IllegalMutationException in PTransform

Posted by Reuven Lax <re...@google.com>.
Unfortunately I'm not very familiar with Scio. However this could also be
caused by an object that either doesn't properly implement the compareTo
method or the coder doesn't return such an object in structuralValue.

On Fri, May 6, 2022 at 11:26 AM Yuri Jin <yu...@unity3d.com> wrote:

> Reuven, thanks for the reply.
>
> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
> ByteArrayCoder.of)" coder.
> I can't paste the code for DoFn due to company policy, but here's the
> structure:
>
> //////////////////////////////////////
> Pipeline.scala
> -------------------------------
> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>
> //////////////////////////////////////
> ParsePayloadDoFn.scala
> -------------------------------
> class ParsePayloadDoFn[InputType](
>   inputAdaptor: RowAdaptor[InputType],
>   ...
>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType,
> OutpuType] {
>
>   @Setup
>   def setup(): Unit =
>     inputAdaptor.setup()
>
>   @ProcessElement
>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit =
>     Try {
>       val result: Result = inputAdaptor.adapt(c.element()) // parse payload
>       ...
>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
> c.output(deadLetterTag, _)
>       val outputPayloadFn: OutputType => Unit = c.output
>
>       result.protocol match {
>         case error: ErrorType =>
>           deadLetterMessageFn(KV.of(..., ...))
>         case payload: Payload =>
>           payload.events.zipWithIndex.foreach {
>             case failure: ParsingFailure =>
>               deadLetterMessageFn(KV.of(..., ...))
>             case (message: Message, index: Int) =>
>               // extract body from Message
>               val body = ...
>               // make a GET http call and compose output
>               val output = ...
>
>               outputPayloadFn(
>                 OutputType(
>                   output,
>                   ...
>                   payload.header,
>                   body,
>                   index
>                 )
>               )
>           }
>       }
>     } match {
>       case Failure(exception) =>
>         error(
>           s"ParsePayloadDoFn - unhandled exception:
> ${exception.getMessage}\nStack trace:
> ${ExceptionUtils.getStackTrace(exception)}"
>         )
>       case Success(_) => ()
>     }
> }
> //////////////////////////////////////
>
> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>
> Thank you,
> Yuri Jin
>
>
> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:
>
>> What is the type of the input - do you have a custom coder? Are you able
>> to paste the code for your DoFn?
>>
>> In answer to your question - Direct runner tests for this, because it is
>> a testing runner. This error scenario can cause random unexpected behavior
>> in production runners, which is why the testing runner tries to explicitly
>> detect it.
>>
>> Reuven
>>
>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>>
>>> Hi Beam users,
>>>
>>> We have a DoFn that reads data from Kafka and parses an array byte
>>> payload. It works fine with dataflow runner, but throws
>>> IllegalMutationException with direct runner. It does not directly modify
>>> the input value. Therefore, I am guessing that the output is different when
>>> there are multiple input values.
>>>
>>> The detailed error is as follows.
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>>> mutated value "DoFnOutputA" after it was output (new value was
>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>         at
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>         at
>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>         at
>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>         at
>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>         at
>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>         at
>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>> "Base64EncodedA", now "Base64EncodedB".
>>>         at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>         at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>         at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>         at
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>         ... 10 more
>>>
>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
>>> different from "Base64EncodedB".
>>>
>>> I was wondering if you could give me some advice on the following
>>> questions.
>>>
>>> 1. How can we find the problematic part? I did some unit tests, but I
>>> couldn't reproduce them.
>>> 2. Have you experienced the same error and solved it?
>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use
>>> the "enforceImmutability=false" option?
>>>
>>>
>>> Any comments would be appreciated.
>>>
>>> Thanks,
>>> Yuri Jin
>>>
>>
>
> --
> Yuri Jin
> Senior Software Developer, Data Platform
> yuri.jin@unity3d.com
> (+1) 778-858-3585 <(778)%20858-3585>
> unity.com
>

Re: IllegalMutationException in PTransform

Posted by Yuri Jin <yu...@unity3d.com>.
Reuven, thanks for the reply.

The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
"KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
ByteArrayCoder.of)" coder.
I can't paste the code for DoFn due to company policy, but here's the
structure:

//////////////////////////////////////
Pipeline.scala
-------------------------------
type InputType = KafkaRecord[Array[Byte], Array[Byte]]

//////////////////////////////////////
ParsePayloadDoFn.scala
-------------------------------
class ParsePayloadDoFn[InputType](
  inputAdaptor: RowAdaptor[InputType],
  ...
  deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType,
OutpuType] {

  @Setup
  def setup(): Unit =
    inputAdaptor.setup()

  @ProcessElement
  def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit =
    Try {
      val result: Result = inputAdaptor.adapt(c.element()) // parse payload
      ...
      val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
c.output(deadLetterTag, _)
      val outputPayloadFn: OutputType => Unit = c.output

      result.protocol match {
        case error: ErrorType =>
          deadLetterMessageFn(KV.of(..., ...))
        case payload: Payload =>
          payload.events.zipWithIndex.foreach {
            case failure: ParsingFailure =>
              deadLetterMessageFn(KV.of(..., ...))
            case (message: Message, index: Int) =>
              // extract body from Message
              val body = ...
              // make a GET http call and compose output
              val output = ...

              outputPayloadFn(
                OutputType(
                  output,
                  ...
                  payload.header,
                  body,
                  index
                )
              )
          }
      }
    } match {
      case Failure(exception) =>
        error(
          s"ParsePayloadDoFn - unhandled exception:
${exception.getMessage}\nStack trace:
${ExceptionUtils.getStackTrace(exception)}"
        )
      case Success(_) => ()
    }
}
//////////////////////////////////////

For reference, we are using Scio v0.11.5 and Beam v2.36.0.

Thank you,
Yuri Jin


On Thu, May 5, 2022 at 10:36 PM Reuven Lax <re...@google.com> wrote:

> What is the type of the input - do you have a custom coder? Are you able
> to paste the code for your DoFn?
>
> In answer to your question - Direct runner tests for this, because it is a
> testing runner. This error scenario can cause random unexpected behavior in
> production runners, which is why the testing runner tries to explicitly
> detect it.
>
> Reuven
>
> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:
>
>> Hi Beam users,
>>
>> We have a DoFn that reads data from Kafka and parses an array byte
>> payload. It works fine with dataflow runner, but throws
>> IllegalMutationException with direct runner. It does not directly modify
>> the input value. Therefore, I am guessing that the output is different when
>> there are multiple input values.
>>
>> The detailed error is as follows.
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>> mutated value "DoFnOutputA" after it was output (new value was
>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>         at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>         at
>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>         at
>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>         at
>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>         at
>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>         at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>> "Base64EncodedA", now "Base64EncodedB".
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>         at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>         at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>         ... 10 more
>>
>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
>> different from "Base64EncodedB".
>>
>> I was wondering if you could give me some advice on the following
>> questions.
>>
>> 1. How can we find the problematic part? I did some unit tests, but I
>> couldn't reproduce them.
>> 2. Have you experienced the same error and solved it?
>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use
>> the "enforceImmutability=false" option?
>>
>>
>> Any comments would be appreciated.
>>
>> Thanks,
>> Yuri Jin
>>
>

-- 
Yuri Jin
Senior Software Developer, Data Platform
yuri.jin@unity3d.com
(+1) 778-858-3585
unity.com

Re: IllegalMutationException in PTransform

Posted by Reuven Lax <re...@google.com>.
What is the type of the input - do you have a custom coder? Are you able to
paste the code for your DoFn?

In answer to your question - Direct runner tests for this, because it is a
testing runner. This error scenario can cause random unexpected behavior in
production runners, which is why the testing runner tries to explicitly
detect it.

Reuven

On Thu, May 5, 2022 at 8:52 PM Yuri Jin <yu...@unity3d.com> wrote:

> Hi Beam users,
>
> We have a DoFn that reads data from Kafka and parses an array byte
> payload. It works fine with dataflow runner, but throws
> IllegalMutationException with direct runner. It does not directly modify
> the input value. Therefore, I am guessing that the output is different when
> there are multiple input values.
>
> The detailed error is as follows.
>
> Exception in thread "main"
> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
> mutated value "DoFnOutputA" after it was output (new value was
> "DoFnOutputB"). Values must not be mutated in any way after being output.
>         at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>         at
> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>         at
> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>         at
> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>         at
> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>         at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
> "Base64EncodedA", now "Base64EncodedB".
>         at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>         at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>         at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>         at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>         ... 10 more
>
> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
> different from "Base64EncodedB".
>
> I was wondering if you could give me some advice on the following
> questions.
>
> 1. How can we find the problematic part? I did some unit tests, but I
> couldn't reproduce them.
> 2. Have you experienced the same error and solved it?
> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use
> the "enforceImmutability=false" option?
>
>
> Any comments would be appreciated.
>
> Thanks,
> Yuri Jin
>