You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/07/16 08:24:19 UTC

Running a batch pipeline on the Classic Java Flink Runner - pipeline starts, but shell blocks

Hello,

I have a batch pipeline with Beam 2.22.0 reading about 200 GiB from
BigQuery, mapping the data and writing it out via CassandraIO.

When I run the pipeline via the Classic Java Flink runner on a 1.10.1 Flink
cluster I face the following issue:

When launching the pipeline via

bin/flink run -d -c test.beam.BigQueryToCassandra -j
/mnt/pipelines/beam_pipelines.jar --runner=FlinkRunner --appName=backload
--numberOfExecutionRetries=20 --executionRetryDelay=10000 --project=XXX
--parallelism=1

The shell then returns after some minutes and throws:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Pipeline execution failed
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
        at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException: Pipeline execution failed
        at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:90)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
        at
ch.ricardo.di.beam.BigQueryToCassandra.main(BigQueryToCassandra.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 8 more
Caused by: java.lang.RuntimeException:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
        at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290)
        at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952)
        at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:84)
        at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:53)
        at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:150)
        at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:87)
        ... 16 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
        at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947)
        ... 20 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
        at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
        at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
        at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:274)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

End of exception on server side>]
        at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
        at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        ... 4 more
command terminated with exit code 1


However, when checking the jobmanager logs, I can see that the pipeline has
been launched and is running successfully.  I have the feeling, since the
BigQuery extract job is being triggered from the jobmanager, that there is
some timeout I might need to increase to make it work without throwing this
- to allow the BigQuery extract job to finish before that timeout is hit
maybe. I also tried using BATCH_FORCED, but that didn't do the trick.


My second question is around parallelism in batch mode: It seems like even
though I run my pipeline with parallelism=2 and Flink shows that each task
has a parallelism of 2, only one taskmanager is really doing work and the
other one has nothing to do according to its logs. As if the bundles were
not evenly distributed after the export from BigQuery is done. Is there
anything I can do about it?

Cheers,
Tobi

Re: Running a batch pipeline on the Classic Java Flink Runner - pipeline starts, but shell blocks

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
When running with parallelism=2, and connecting to the taskmanager that
seemed to be not doing CassandraIO according to it's debug log, it seems
like the work is distributed by the tasks and not parallelized per machine,
does that make sense?

Here is an output from the profiling of the taskmanager without CassandraIO
happening in it's debug log:

[image: image.png]

and here is the output of the other one:

[image: image.png]

so the Cassandra write seems to happen only on one of the two taskmanagers.
Is this normal?

On Thu, Jul 16, 2020 at 10:24 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> I have a batch pipeline with Beam 2.22.0 reading about 200 GiB from
> BigQuery, mapping the data and writing it out via CassandraIO.
>
> When I run the pipeline via the Classic Java Flink runner on a 1.10.1
> Flink cluster I face the following issue:
>
> When launching the pipeline via
>
> bin/flink run -d -c test.beam.BigQueryToCassandra -j
> /mnt/pipelines/beam_pipelines.jar --runner=FlinkRunner --appName=backload
> --numberOfExecutionRetries=20 --executionRetryDelay=10000 --project=XXX
> --parallelism=1
>
> The shell then returns after some minutes and throws:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Pipeline execution failed
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>         at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.lang.RuntimeException: Pipeline execution failed
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:90)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>         at
> ch.ricardo.di.beam.BigQueryToCassandra.main(BigQueryToCassandra.java:69)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>         ... 8 more
> Caused by: java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
>         at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952)
>         at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:84)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:53)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:150)
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:87)
>         ... 16 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
>         at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947)
>         ... 20 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>         at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
>         at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
>         at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
>         at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:274)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>         ... 4 more
> command terminated with exit code 1
>
>
> However, when checking the jobmanager logs, I can see that the pipeline
> has been launched and is running successfully.  I have the feeling, since
> the BigQuery extract job is being triggered from the jobmanager, that there
> is some timeout I might need to increase to make it work without throwing
> this - to allow the BigQuery extract job to finish before that timeout is
> hit maybe. I also tried using BATCH_FORCED, but that didn't do the trick.
>
>
> My second question is around parallelism in batch mode: It seems like even
> though I run my pipeline with parallelism=2 and Flink shows that each task
> has a parallelism of 2, only one taskmanager is really doing work and the
> other one has nothing to do according to its logs. As if the bundles were
> not evenly distributed after the export from BigQuery is done. Is there
> anything I can do about it?
>
> Cheers,
> Tobi
>
>