You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tarandeep Singh <ta...@gmail.com> on 2016/05/03 06:40:24 UTC
Insufficient number of network buffers
Hi,
I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
they run and finish fine. When I try to run them on my cluster, I get
"Insufficient number of network buffers" error.
I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
each. I increased the number of buffers to 5000, but got the same error.
When I increased it further (say 7500), I get exception listed below.
The DAG or execution plan is pretty big. What is recommended way to run
your jobs when the DAG becomes huge? Shall I break it into parts by calling
execute on execution environment in between jobs ?
Thanks,
Tarandeep
Exception I got after I tried to run with 7500 buffers:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Update task on instance
d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
at
org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]]
after [10000 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
Re: Insufficient number of network buffers
Posted by Tarandeep Singh <ta...@gmail.com>.
Yes, you are right, the exception was caused as task managers were heavily
loaded. I checked ganglia metrics and CPU usage was very high. I reduced
parallelism and ran with 5000 buffers and didn't get any exception.
Thanks,
Tarandeep
On Tue, May 3, 2016 at 2:19 AM, Ufuk Celebi <uc...@apache.org> wrote:
> Hey Tarandeep,
>
> I think the failures are unrelated. Regarding the number of network
> buffers:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers
>
> The timeouts might occur, because the task managers are pretty loaded.
> I would suggest to increase the Akka ask timeouts via
> akka.ask.timeout: 100 s
> (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#distributed-coordination-via-akka
> )
>
> – Ufuk
>
> On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh <ta...@gmail.com>
> wrote:
> > Hi,
> >
> > I have written ETL jobs in Flink (DataSet API). When I execute them in
> IDE,
> > they run and finish fine. When I try to run them on my cluster, I get
> > "Insufficient number of network buffers" error.
> >
> > I have 5 machines in my cluster with 4 cores each. TaskManager is given
> 3GB
> > each. I increased the number of buffers to 5000, but got the same error.
> > When I increased it further (say 7500), I get exception listed below.
> >
> > The DAG or execution plan is pretty big. What is recommended way to run
> your
> > jobs when the DAG becomes huge? Shall I break it into parts by calling
> > execute on execution environment in between jobs ?
> >
> > Thanks,
> > Tarandeep
> >
> > Exception I got after I tried to run with 7500 buffers:
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > execution failed.
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> > at
> >
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> > at
> >
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.IllegalStateException: Update task on instance
> > d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
> > akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
> > at
> >
> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
> > at akka.dispatch.OnFailure.internal(Future.scala:228)
> > at akka.dispatch.OnFailure.internal(Future.scala:227)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > at
> >
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> > at
> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
> > at
> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > ... 2 more
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999
> ]]
> > after [10000 ms]
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> > at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> > at
> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> >
> >
> >
> >
> >
> >
>
Re: Insufficient number of network buffers
Posted by Ufuk Celebi <uc...@apache.org>.
Hey Tarandeep,
I think the failures are unrelated. Regarding the number of network
buffers: https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers
The timeouts might occur, because the task managers are pretty loaded.
I would suggest to increase the Akka ask timeouts via
akka.ask.timeout: 100 s
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#distributed-coordination-via-akka)
– Ufuk
On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh <ta...@gmail.com> wrote:
> Hi,
>
> I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
> they run and finish fine. When I try to run them on my cluster, I get
> "Insufficient number of network buffers" error.
>
> I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
> each. I increased the number of buffers to 5000, but got the same error.
> When I increased it further (say 7500), I get exception listed below.
>
> The DAG or execution plan is pretty big. What is recommended way to run your
> jobs when the DAG becomes huge? Shall I break it into parts by calling
> execute on execution environment in between jobs ?
>
> Thanks,
> Tarandeep
>
> Exception I got after I tried to run with 7500 buffers:
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Update task on instance
> d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
> akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
> at
> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
> at akka.dispatch.OnFailure.internal(Future.scala:228)
> at akka.dispatch.OnFailure.internal(Future.scala:227)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ... 2 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]]
> after [10000 ms]
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>
>
>