You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Luke Cwik (Jira)" <ji...@apache.org> on 2020/05/27 04:20:00 UTC

[jira] [Comment Edited] (BEAM-10016) Flink postcommits failing testFlattenWithDifferentInputAndOutputCoders2

    [ https://issues.apache.org/jira/browse/BEAM-10016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117378#comment-17117378 ] 

Luke Cwik edited comment on BEAM-10016 at 5/27/20, 4:19 AM:
------------------------------------------------------------

The issue is that the GreedyPipelineFuser (and related classes) doesn't take into account the change in the encoding from the flattens input to the flattens output in certain scenarios where the flatten isn't being merged with an existing stage.

Normally one could copy the coder from the flatten's output PCollection to all the input PCollections to fix this but this doesn't hold when dealing with cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder)           -> Flatten(Python) -> PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int coder and varint coder but Java/Go would only know the big endian int coder and little endian int coder respectively.

The solution in the above example is to make the Python SDK perform the transcoding by having it execute the flatten. Only flattens where the input/output coder matches can be done by a runner since no transcoding is necessary.

An alternative would be to require flattens have the same input and output coders but this has its own problems since it would be a backwards incompatible change or to insert identity ParDo's within SDKs to make sure that input/output coders match whenever there is a Flatten.


was (Author: lcwik):
The issue is that the GreedyPipelineFuser (and related classes) doesn't take into account the change in the encoding from the flattens input to the flattens output in certain scenarios where the flatten isn't being merged with an existing stage.

Normally one could copy the coder from the flatten's output PCollection to all the input PCollections to fix this but this doesn't hold when dealing with cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder)           -> Flatten(Python) -> PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int coder and varint coder but Java/Go would only know the big endian int coder and little endian int coder respectively.

The solution in the above example is to make the Python SDK perform the transcoding by having it execute the flatten. Only flattens where the input/output coder matches can be done by a runner since no transcoding is necessary.

> Flink postcommits failing testFlattenWithDifferentInputAndOutputCoders2
> -----------------------------------------------------------------------
>
>                 Key: BEAM-10016
>                 URL: https://issues.apache.org/jira/browse/BEAM-10016
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Kyle Weaver
>            Assignee: Maximilian Michels
>            Priority: P2
>
> Both beam_PostCommit_Java_PVR_Flink_Batch and beam_PostCommit_Java_PVR_Flink_Streaming are failing newly added test org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2.
> SEVERE: Error in task code:  CHAIN MapPartition (MapPartition at [6]{Values, FlatMapElements, PAssert$0}) -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> GroupCombine (GroupCombine at GroupCombine: PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey) -> Map (Key Extractor) (2/2) java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to [B
> 	at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> 	at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> 	at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
> 	at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
> 	at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:667)
> 	at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:271)
> 	at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:203)
> 	at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> 	at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)