You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bj...@gmail.com, bj...@gmail.com on 2019/06/03 15:07:04 UTC

Re: Pipeline TimeOut with Beam SideInput


On 2019/05/30 14:12:35, bjbq4d@gmail.com <bj...@gmail.com> wrote: 
> Hi everyone,
> 
> I've made a Beam pipeline that makes use of a SideInput which in my case is a Map of key/values. I'm running Flink (1.7.1) on yarn (hadoop 2.6.0). I've found that if my map is small enough everything works fine but if I make it large enough (2-3MB) the pipeline fails with,
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.RuntimeException: Pipeline execution failed
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:117)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> at com.cerner.pophealth.vx130.processing.console.SideInputRunner.main(SideInputRunner.java:34)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> .. 12 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 235b5466595d04d19df4f242531b51f3)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
> at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:123)
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:114)
> .. 20 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> .. 25 more
> Caused by: java.lang.RuntimeException: Could not retrieve next input split.
> at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
> at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
> at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
> .. 3 more
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
> .. 4 more
> 
> I even managed to simplify my pipeline to a WordCount-like using a SideInput and see the same result. I've tried increasing the akka.ask.timeout but this doesn't seem to help though it does increase the amount of time before the pipeline fails. If I use the DirectRunner this works though it can take awhile (several minutes for a simple test run).
> 
> Turning on debug logging and looking at the flink logs the most interesting part I see is this,
> 
> DEBUG org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization  - Getting Broadcast Variable (2e9d9a71db773d6c2023b49ba803785b "org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:390#a32dc9f64f1df03a" (1)) - First access, materializing.
> 
> Followed by numerous heartbeat messages before it fails. This is definitely for fetching my side input and the class should print another message when it fetches all the data but never does. I attempted to further dig into this but all I find are interfaces and abstract classes for reading and serializing data.
> 
> Can anyone recommend what I can try from here?
> 

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.