You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ANDREA SPINA <74...@studenti.unimore.it> on 2016/06/29 10:19:25 UTC
Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Hi everyone,
I am running some Flink experiments with Peel benchmark
http://peel-framework.org/ and I am struggling with exceptions: the
environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3.
At the beginning I tried with 400 as degree of parallelism but not enough
numberOfBuffers was raised so I changed the parallelism to 200. Flink
configuration follows:
jobmanager.rpc.address = ${runtime.hostname}
akka.log.lifecycle.events = ON
akka.ask.timeout = 300s
jobmanager.rpc.port = 6002
jobmanager.heap.mb = 1024
jobmanager.web.port = 6004
taskmanager.heap.mb = 28672
taskmanager.memory.fraction = 0.7
taskmanager.network.numberOfBuffers = 32768
taskmanager.network.bufferSizeInBytes = 16384
taskmanager.tmp.dirs =
"/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
taskmanager.debug.memory.startLogThread = true
With a parallelism of 200 the following exception will raise from a node of
the cluster:
2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
- CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
switched to FAILED with exception.
java.lang.IllegalStateException: Received unexpected partition state null
for partition request. This is a bug.
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
The reduce code is:
43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
The map code is:
68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
69 dimensionDS.map {
70 dimension =>
71 val values = DenseVector(Array.fill(dimension)(0.0))
72 values
73 }
74 }
I can't figure out a solution, thank you for your help.
Andrea
--
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Posted by ANDREA SPINA <74...@studenti.unimore.it>.
Hi everybody,
increasing the akka.ask.timeout solved the second issue. Anyway that was a
warning about a congestioned network. So I worked to improve the algorithm.
Increasing the numberOfBuffers and the corresponding size solved the first
issue, thus now I can run with the full DOP.
In my case enabling the off-heap memory didn't the trick.
Thank you. All the bests,
Andrea
2016-06-29 17:10 GMT+02:00 Martin Scholl <ms...@funkpopes.org>:
> Other than increasing the ask.timeout, we've seen such failures being
> caused by long GC pauses over bigger heaps. In such a case, you could
> fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e.
> taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit.
>
> Hope it helps,
> Martin
>
>
> On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi <uc...@apache.org> wrote:
>
>> OK, looks like you can easily give more memory to the network stack,
>> e.g. for 2 GB set
>>
>> taskmanager.network.numberOfBuffers = 65536
>> taskmanager.network.bufferSizeInBytes = 32768
>>
>> For the other exception, your logs confirm that there is something
>> else going on. Try increasing the akka ask timeout:
>>
>> akka.ask.timeout: 100 s
>>
>> Does this help?
>>
>>
>> On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it>
>> wrote:
>> > Hi Ufuk,
>> >
>> > so the memory available per node is 48294 megabytes per node, but I
>> reserve
>> > 28 by flink conf file.
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> >
>> > Anyway Follows what I found in log files.
>> >
>> > Follows the taskmanager log (task manager that seems failed)
>> >
>> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
>> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
>> >
>> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> (1/1)
>> > switched to FAILED with exception.
>> > java.lang.IllegalStateException: Received unexpected partition state
>> null
>> > for partition request. This is a bug.
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>> > at
>> > org.apache.flink.runtime.taskmanager.TaskManager.org
>> $apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
>> > 468)
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> >
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> > at
>> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> > at
>> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>> > at
>> >
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > Follows the jobmanager log
>> >
>> > 2016-06-29 11:31:34,683 INFO
>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN
>> Reduce
>> > (Reduce at dima.tu.berlin.benchmark.fli
>> >
>> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> (1/1)
>> > (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
>> > 2016-06-29 11:31:34,694 INFO
>> org.apache.flink.runtime.jobmanager.JobManager
>> > - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
>> > -bundle-flink) changed to FAILING.
>> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
>> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
>> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
>> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
>> akka.tcp://f
>> > link@130.149.21.16:6122/user/taskmanager) not responding after a
>> timeout of
>> > 10000 milliseconds
>> > at
>> >
>> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>> > at akka.dispatch.OnComplete.internal(Future.scala:246)
>> > at akka.dispatch.OnComplete.internal(Future.scala:244)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> > at
>> >
>> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872
>> ]]
>> > after [1000
>> > 0 ms]
>> > at
>> >
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>> > at
>> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > Follows the client-{$runtime.hostname}.log
>> >
>> > 2016-06-29 11:31:34,687 INFO
>> org.apache.flink.runtime.client.JobClientActor
>> > - 06/29/2016 11:31:34 CHAIN Reduce (Reduce at di
>> >
>> ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.
>> >
>> benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1)
>> > switched to FAILED
>> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
>> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
>> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
>> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
>> akka.tcp://f
>> > link@130.149.21.16:6122/user/taskmanager) not responding after a
>> timeout of
>> > 10000 milliseconds
>> > at
>> >
>> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>> > at akka.dispatch.OnComplete.internal(Future.scala:246)
>> > at akka.dispatch.OnComplete.internal(Future.scala:244)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> > at
>> >
>> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872
>> ]]
>> > after [1000
>> > 0 ms]
>> > at
>> >
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>> > at
>> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>> > at java.lang.Thread.run(Thread.java:745)
>> > 2016-06-29 11:31:34,709 INFO
>> org.apache.flink.runtime.client.JobClientActor
>> > - 06/29/2016 11:31:34 Job execution switched to
>> > status FAILING.
>> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
>> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
>> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
>> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
>> akka.tcp://f
>> > link@130.149.21.16:6122/user/taskmanager) not responding after a
>> timeout of
>> > 10000 milliseconds
>> > at
>> >
>> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
>> > at akka.dispatch.OnComplete.internal(Future.scala:246)
>> > at akka.dispatch.OnComplete.internal(Future.scala:244)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> > at
>> >
>> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872
>> ]]
>> > after [10000 ms]
>> > at
>> >
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>> > at
>> >
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>> > at
>> >
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>> > at
>> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > Really appreciating your help here. :)
>> > Cheers,
>> > Andrea
>> >
>> > 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>> >>
>> >> Hey Andrea! Sorry for the bad user experience.
>> >>
>> >> Regarding the network buffers: you should be able to run it after
>> >> increasing the number of network buffers, just account for it when
>> >> specifying the heap size etc. You currently allocate 32768 * 16384
>> >> bytes = 512 MB for them. If you have a very long pipeline and high
>> >> parallelism, you should increase it accordingly. How much memory do
>> >> you have on your machines?
>> >>
>> >> Regarding the IllegalStateException: I suspect that this is **not**
>> >> the root failure cause. The null ExecutionState can only happen, if
>> >> the producer task (from which data is requested) failed during the
>> >> request. The error message is confusing and I opened a JIRA to fix it:
>> >> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
>> >> your complete logs to see what the root cause might be, e.g. why did
>> >> the producer fail?
>> >>
>> >>
>> >> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
>> >> <74...@studenti.unimore.it> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I am running some Flink experiments with Peel benchmark
>> >> > http://peel-framework.org/ and I am struggling with exceptions: the
>> >> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
>> >> > ~80GiB
>> >> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>> >> >
>> >> > At the beginning I tried with 400 as degree of parallelism but not
>> >> > enough
>> >> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
>> >> > configuration follows:
>> >> >
>> >> > jobmanager.rpc.address = ${runtime.hostname}
>> >> > akka.log.lifecycle.events = ON
>> >> > akka.ask.timeout = 300s
>> >> > jobmanager.rpc.port = 6002
>> >> > jobmanager.heap.mb = 1024
>> >> > jobmanager.web.port = 6004
>> >> > taskmanager.heap.mb = 28672
>> >> > taskmanager.memory.fraction = 0.7
>> >> > taskmanager.network.numberOfBuffers = 32768
>> >> > taskmanager.network.bufferSizeInBytes = 16384
>> >> > taskmanager.tmp.dirs =
>> >> >
>> >> >
>> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
>> >> > taskmanager.debug.memory.startLogThread = true
>> >> >
>> >> > With a parallelism of 200 the following exception will raise from a
>> node
>> >> > of
>> >> > the cluster:
>> >> >
>> >> > 2016-06-29 11:31:55,673 INFO
>> org.apache.flink.runtime.taskmanager.Task
>> >> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
>> >> >
>> >> >
>> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> >> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> >> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> >> > (1/1)
>> >> > switched to FAILED with exception.
>> >> > java.lang.IllegalStateException: Received unexpected partition state
>> >> > null
>> >> > for partition request. This is a bug.
>> >> > at
>> >> >
>> >> >
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>> >> >
>> >> >
>> >> > The reduce code is:
>> >> >
>> >> > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>> >> >
>> >> > The map code is:
>> >> >
>> >> > 68 def createInitialVector(dimensionDS: DataSet[Int]):
>> DataSet[Vector]
>> >> > = {
>> >> > 69 dimensionDS.map {
>> >> > 70 dimension =>
>> >> > 71 val values = DenseVector(Array.fill(dimension)(0.0))
>> >> > 72 values
>> >> > 73 }
>> >> > 74 }
>> >> >
>> >> > I can't figure out a solution, thank you for your help.
>> >> >
>> >> > Andrea
>> >> >
>> >> > --
>> >> > Andrea Spina
>> >> > N.Tessera: 74598
>> >> > MAT: 89369
>> >> > Ingegneria Informatica [LM] (D.M. 270)
>> >
>> >
>> >
>> >
>> > --
>> > Andrea Spina
>> > N.Tessera: 74598
>> > MAT: 89369
>> > Ingegneria Informatica [LM] (D.M. 270)
>>
>
--
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Posted by Martin Scholl <ms...@funkpopes.org>.
Other than increasing the ask.timeout, we've seen such failures being
caused by long GC pauses over bigger heaps. In such a case, you could
fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e.
taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit.
Hope it helps,
Martin
On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi <uc...@apache.org> wrote:
> OK, looks like you can easily give more memory to the network stack,
> e.g. for 2 GB set
>
> taskmanager.network.numberOfBuffers = 65536
> taskmanager.network.bufferSizeInBytes = 32768
>
> For the other exception, your logs confirm that there is something
> else going on. Try increasing the akka ask timeout:
>
> akka.ask.timeout: 100 s
>
> Does this help?
>
>
> On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it>
> wrote:
> > Hi Ufuk,
> >
> > so the memory available per node is 48294 megabytes per node, but I
> reserve
> > 28 by flink conf file.
> > taskmanager.heap.mb = 28672
> > taskmanager.memory.fraction = 0.7
> > taskmanager.network.numberOfBuffers = 32768
> > taskmanager.network.bufferSizeInBytes = 16384
> >
> > Anyway Follows what I found in log files.
> >
> > Follows the taskmanager log (task manager that seems failed)
> >
> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > switched to FAILED with exception.
> > java.lang.IllegalStateException: Received unexpected partition state null
> > for partition request. This is a bug.
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> > at
> > org.apache.flink.runtime.taskmanager.TaskManager.org
> $apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
> > 468)
> > at
> >
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> > at
> >
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> > at
> >
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> > at
> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > at
> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> > at
> >
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > at
> >
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > Follows the jobmanager log
> >
> > 2016-06-29 11:31:34,683 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN
> Reduce
> > (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
> > 2016-06-29 11:31:34,694 INFO
> org.apache.flink.runtime.jobmanager.JobManager
> > - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
> > -bundle-flink) changed to FAILING.
> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
> akka.tcp://f
> > link@130.149.21.16:6122/user/taskmanager) not responding after a
> timeout of
> > 10000 milliseconds
> > at
> >
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> > at akka.dispatch.OnComplete.internal(Future.scala:246)
> > at akka.dispatch.OnComplete.internal(Future.scala:244)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> > after [1000
> > 0 ms]
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> > at
> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > Follows the client-{$runtime.hostname}.log
> >
> > 2016-06-29 11:31:34,687 INFO
> org.apache.flink.runtime.client.JobClientActor
> > - 06/29/2016 11:31:34 CHAIN Reduce (Reduce at di
> >
> ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.
> >
> benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1)
> > switched to FAILED
> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
> akka.tcp://f
> > link@130.149.21.16:6122/user/taskmanager) not responding after a
> timeout of
> > 10000 milliseconds
> > at
> >
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> > at akka.dispatch.OnComplete.internal(Future.scala:246)
> > at akka.dispatch.OnComplete.internal(Future.scala:244)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> > after [1000
> > 0 ms]
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> > at
> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> > at java.lang.Thread.run(Thread.java:745)
> > 2016-06-29 11:31:34,709 INFO
> org.apache.flink.runtime.client.JobClientActor
> > - 06/29/2016 11:31:34 Job execution switched to
> > status FAILING.
> > java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> > ector(sGradientDescentL2.scala:43)) -> Map (Map at
> >
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> > ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> > (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL:
> akka.tcp://f
> > link@130.149.21.16:6122/user/taskmanager) not responding after a
> timeout of
> > 10000 milliseconds
> > at
> >
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> > at akka.dispatch.OnComplete.internal(Future.scala:246)
> > at akka.dispatch.OnComplete.internal(Future.scala:244)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> > after [10000 ms]
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> > at
> > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Really appreciating your help here. :)
> > Cheers,
> > Andrea
> >
> > 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
> >>
> >> Hey Andrea! Sorry for the bad user experience.
> >>
> >> Regarding the network buffers: you should be able to run it after
> >> increasing the number of network buffers, just account for it when
> >> specifying the heap size etc. You currently allocate 32768 * 16384
> >> bytes = 512 MB for them. If you have a very long pipeline and high
> >> parallelism, you should increase it accordingly. How much memory do
> >> you have on your machines?
> >>
> >> Regarding the IllegalStateException: I suspect that this is **not**
> >> the root failure cause. The null ExecutionState can only happen, if
> >> the producer task (from which data is requested) failed during the
> >> request. The error message is confusing and I opened a JIRA to fix it:
> >> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
> >> your complete logs to see what the root cause might be, e.g. why did
> >> the producer fail?
> >>
> >>
> >> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
> >> <74...@studenti.unimore.it> wrote:
> >> > Hi everyone,
> >> >
> >> > I am running some Flink experiments with Peel benchmark
> >> > http://peel-framework.org/ and I am struggling with exceptions: the
> >> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
> >> > ~80GiB
> >> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
> >> >
> >> > At the beginning I tried with 400 as degree of parallelism but not
> >> > enough
> >> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
> >> > configuration follows:
> >> >
> >> > jobmanager.rpc.address = ${runtime.hostname}
> >> > akka.log.lifecycle.events = ON
> >> > akka.ask.timeout = 300s
> >> > jobmanager.rpc.port = 6002
> >> > jobmanager.heap.mb = 1024
> >> > jobmanager.web.port = 6004
> >> > taskmanager.heap.mb = 28672
> >> > taskmanager.memory.fraction = 0.7
> >> > taskmanager.network.numberOfBuffers = 32768
> >> > taskmanager.network.bufferSizeInBytes = 16384
> >> > taskmanager.tmp.dirs =
> >> >
> >> >
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> >> > taskmanager.debug.memory.startLogThread = true
> >> >
> >> > With a parallelism of 200 the following exception will raise from a
> node
> >> > of
> >> > the cluster:
> >> >
> >> > 2016-06-29 11:31:55,673 INFO
> org.apache.flink.runtime.taskmanager.Task
> >> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> >> >
> >> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> >> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> >> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> >> > (1/1)
> >> > switched to FAILED with exception.
> >> > java.lang.IllegalStateException: Received unexpected partition state
> >> > null
> >> > for partition request. This is a bug.
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> >> >
> >> >
> >> > The reduce code is:
> >> >
> >> > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
> >> >
> >> > The map code is:
> >> >
> >> > 68 def createInitialVector(dimensionDS: DataSet[Int]):
> DataSet[Vector]
> >> > = {
> >> > 69 dimensionDS.map {
> >> > 70 dimension =>
> >> > 71 val values = DenseVector(Array.fill(dimension)(0.0))
> >> > 72 values
> >> > 73 }
> >> > 74 }
> >> >
> >> > I can't figure out a solution, thank you for your help.
> >> >
> >> > Andrea
> >> >
> >> > --
> >> > Andrea Spina
> >> > N.Tessera: 74598
> >> > MAT: 89369
> >> > Ingegneria Informatica [LM] (D.M. 270)
> >
> >
> >
> >
> > --
> > Andrea Spina
> > N.Tessera: 74598
> > MAT: 89369
> > Ingegneria Informatica [LM] (D.M. 270)
>
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Posted by Ufuk Celebi <uc...@apache.org>.
OK, looks like you can easily give more memory to the network stack,
e.g. for 2 GB set
taskmanager.network.numberOfBuffers = 65536
taskmanager.network.bufferSizeInBytes = 32768
For the other exception, your logs confirm that there is something
else going on. Try increasing the akka ask timeout:
akka.ask.timeout: 100 s
Does this help?
On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it> wrote:
> Hi Ufuk,
>
> so the memory available per node is 48294 megabytes per node, but I reserve
> 28 by flink conf file.
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
>
> Anyway Follows what I found in log files.
>
> Follows the taskmanager log (task manager that seems failed)
>
> 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
> at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
> 468)
> at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Follows the jobmanager log
>
> 2016-06-29 11:31:34,683 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Reduce
> (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> (8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
> 2016-06-29 11:31:34,694 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 71542654d427e8d0e7e01c538abe1acf (peel
> -bundle-flink) changed to FAILING.
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
> at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> at akka.dispatch.OnComplete.internal(Future.scala:246)
> at akka.dispatch.OnComplete.internal(Future.scala:244)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [1000
> 0 ms]
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Follows the client-{$runtime.hostname}.log
>
> 2016-06-29 11:31:34,687 INFO org.apache.flink.runtime.client.JobClientActor
> - 06/29/2016 11:31:34 CHAIN Reduce (Reduce at di
> ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.
> benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1)
> switched to FAILED
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
> at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> at akka.dispatch.OnComplete.internal(Future.scala:246)
> at akka.dispatch.OnComplete.internal(Future.scala:244)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [1000
> 0 ms]
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> at java.lang.Thread.run(Thread.java:745)
> 2016-06-29 11:31:34,709 INFO org.apache.flink.runtime.client.JobClientActor
> - 06/29/2016 11:31:34 Job execution switched to
> status FAILING.
> java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
> ector(sGradientDescentL2.scala:43)) -> Map (Map at
> dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
> ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
> (c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
> link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
> 10000 milliseconds
> at
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
> at akka.dispatch.OnComplete.internal(Future.scala:246)
> at akka.dispatch.OnComplete.internal(Future.scala:244)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
> after [10000 ms]
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> at java.lang.Thread.run(Thread.java:745)
>
> Really appreciating your help here. :)
> Cheers,
> Andrea
>
> 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>
>> Hey Andrea! Sorry for the bad user experience.
>>
>> Regarding the network buffers: you should be able to run it after
>> increasing the number of network buffers, just account for it when
>> specifying the heap size etc. You currently allocate 32768 * 16384
>> bytes = 512 MB for them. If you have a very long pipeline and high
>> parallelism, you should increase it accordingly. How much memory do
>> you have on your machines?
>>
>> Regarding the IllegalStateException: I suspect that this is **not**
>> the root failure cause. The null ExecutionState can only happen, if
>> the producer task (from which data is requested) failed during the
>> request. The error message is confusing and I opened a JIRA to fix it:
>> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
>> your complete logs to see what the root cause might be, e.g. why did
>> the producer fail?
>>
>>
>> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
>> <74...@studenti.unimore.it> wrote:
>> > Hi everyone,
>> >
>> > I am running some Flink experiments with Peel benchmark
>> > http://peel-framework.org/ and I am struggling with exceptions: the
>> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
>> > ~80GiB
>> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>> >
>> > At the beginning I tried with 400 as degree of parallelism but not
>> > enough
>> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
>> > configuration follows:
>> >
>> > jobmanager.rpc.address = ${runtime.hostname}
>> > akka.log.lifecycle.events = ON
>> > akka.ask.timeout = 300s
>> > jobmanager.rpc.port = 6002
>> > jobmanager.heap.mb = 1024
>> > jobmanager.web.port = 6004
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> > taskmanager.tmp.dirs =
>> >
>> > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
>> > taskmanager.debug.memory.startLogThread = true
>> >
>> > With a parallelism of 200 the following exception will raise from a node
>> > of
>> > the cluster:
>> >
>> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
>> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
>> >
>> > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> > (1/1)
>> > switched to FAILED with exception.
>> > java.lang.IllegalStateException: Received unexpected partition state
>> > null
>> > for partition request. This is a bug.
>> > at
>> >
>> > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>> >
>> >
>> > The reduce code is:
>> >
>> > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>> >
>> > The map code is:
>> >
>> > 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector]
>> > = {
>> > 69 dimensionDS.map {
>> > 70 dimension =>
>> > 71 val values = DenseVector(Array.fill(dimension)(0.0))
>> > 72 values
>> > 73 }
>> > 74 }
>> >
>> > I can't figure out a solution, thank you for your help.
>> >
>> > Andrea
>> >
>> > --
>> > Andrea Spina
>> > N.Tessera: 74598
>> > MAT: 89369
>> > Ingegneria Informatica [LM] (D.M. 270)
>
>
>
>
> --
> Andrea Spina
> N.Tessera: 74598
> MAT: 89369
> Ingegneria Informatica [LM] (D.M. 270)
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Posted by ANDREA SPINA <74...@studenti.unimore.it>.
Hi Ufuk,
so the memory available per node is 48294 megabytes per node, but I reserve
28 by flink conf file.
taskmanager.heap.mb = 28672
taskmanager.memory.fraction = 0.7
taskmanager.network.numberOfBuffers = 32768
taskmanager.network.bufferSizeInBytes = 16384
Anyway Follows what I found in log files.
*Follows the taskmanager log (task manager that seems failed)*
2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
- CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
switched to FAILED with exception.
java.lang.IllegalStateException: Received unexpected partition state null
for partition request. This is a bug.
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
at org.apache.flink.runtime.taskmanager.TaskManager.org
$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
468)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
*Follows the jobmanager log*
2016-06-29 11:31:34,683 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN
Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
(8c2d9a0a0520c2c18e07bad0a97a3911) switched from DEPLOYING to FAILED
2016-06-29 11:31:34,694 INFO
org.apache.flink.runtime.jobmanager.JobManager - Status of
job 71542654d427e8d0e7e01c538abe1acf (peel
-bundle-flink) changed to FAILING.
java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
ector(sGradientDescentL2.scala:43)) -> Map (Map at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
(c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
10000 milliseconds
at
org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
at akka.dispatch.OnComplete.internal(Future.scala:246)
at akka.dispatch.OnComplete.internal(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
after [1000
0 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
*Follows the client-{$runtime.hostname}.log*
2016-06-29 11:31:34,687 INFO
org.apache.flink.runtime.client.JobClientActor - 06/29/2016
11:31:34 CHAIN Reduce (Reduce at di
ma.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.
benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))(1/1)
switched to FAILED
java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
ector(sGradientDescentL2.scala:43)) -> Map (Map at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
(c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
10000 milliseconds
at
org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
at akka.dispatch.OnComplete.internal(Future.scala:246)
at akka.dispatch.OnComplete.internal(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
after [1000
0 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
2016-06-29 11:31:34,709 INFO
org.apache.flink.runtime.client.JobClientActor - 06/29/2016
11:31:34 Job execution switched to
status FAILING.
java.lang.Exception: Cannot deploy task CHAIN Reduce (Reduce at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialWeightsV
ector(sGradientDescentL2.scala:43)) -> Map (Map at
dima.tu.berlin.benchmark.flink.mlr.solver.sGradientDescentL2.createInitialVector(sGradientDesce
ntL2.scala:69)) (1/1) (8c2d9a0a0520c2c18e07bad0a97a3911) - TaskManager
(c0b308245dfc5da6d759fb5bc1bc5ad0 @ cloud-12 - 16 slots - URL: akka.tcp://f
link@130.149.21.16:6122/user/taskmanager) not responding after a timeout of
10000 milliseconds
at
org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:387)
at akka.dispatch.OnComplete.internal(Future.scala:246)
at akka.dispatch.OnComplete.internal(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
after [10000 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
Really appreciating your help here. :)
Cheers,
Andrea
2016-06-29 13:48 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
> Hey Andrea! Sorry for the bad user experience.
>
> Regarding the network buffers: you should be able to run it after
> increasing the number of network buffers, just account for it when
> specifying the heap size etc. You currently allocate 32768 * 16384
> bytes = 512 MB for them. If you have a very long pipeline and high
> parallelism, you should increase it accordingly. How much memory do
> you have on your machines?
>
> Regarding the IllegalStateException: I suspect that this is **not**
> the root failure cause. The null ExecutionState can only happen, if
> the producer task (from which data is requested) failed during the
> request. The error message is confusing and I opened a JIRA to fix it:
> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
> your complete logs to see what the root cause might be, e.g. why did
> the producer fail?
>
>
> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
> <74...@studenti.unimore.it> wrote:
> > Hi everyone,
> >
> > I am running some Flink experiments with Peel benchmark
> > http://peel-framework.org/ and I am struggling with exceptions: the
> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
> ~80GiB
> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
> >
> > At the beginning I tried with 400 as degree of parallelism but not enough
> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
> > configuration follows:
> >
> > jobmanager.rpc.address = ${runtime.hostname}
> > akka.log.lifecycle.events = ON
> > akka.ask.timeout = 300s
> > jobmanager.rpc.port = 6002
> > jobmanager.heap.mb = 1024
> > jobmanager.web.port = 6004
> > taskmanager.heap.mb = 28672
> > taskmanager.memory.fraction = 0.7
> > taskmanager.network.numberOfBuffers = 32768
> > taskmanager.network.bufferSizeInBytes = 16384
> > taskmanager.tmp.dirs =
> >
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> > taskmanager.debug.memory.startLogThread = true
> >
> > With a parallelism of 200 the following exception will raise from a node
> of
> > the cluster:
> >
> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > switched to FAILED with exception.
> > java.lang.IllegalStateException: Received unexpected partition state null
> > for partition request. This is a bug.
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> >
> >
> > The reduce code is:
> >
> > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
> >
> > The map code is:
> >
> > 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector]
> = {
> > 69 dimensionDS.map {
> > 70 dimension =>
> > 71 val values = DenseVector(Array.fill(dimension)(0.0))
> > 72 values
> > 73 }
> > 74 }
> >
> > I can't figure out a solution, thank you for your help.
> >
> > Andrea
> >
> > --
> > Andrea Spina
> > N.Tessera: 74598
> > MAT: 89369
> > Ingegneria Informatica [LM] (D.M. 270)
>
--
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Posted by Ufuk Celebi <uc...@apache.org>.
Hey Andrea! Sorry for the bad user experience.
Regarding the network buffers: you should be able to run it after
increasing the number of network buffers, just account for it when
specifying the heap size etc. You currently allocate 32768 * 16384
bytes = 512 MB for them. If you have a very long pipeline and high
parallelism, you should increase it accordingly. How much memory do
you have on your machines?
Regarding the IllegalStateException: I suspect that this is **not**
the root failure cause. The null ExecutionState can only happen, if
the producer task (from which data is requested) failed during the
request. The error message is confusing and I opened a JIRA to fix it:
https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
your complete logs to see what the root cause might be, e.g. why did
the producer fail?
On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
<74...@studenti.unimore.it> wrote:
> Hi everyone,
>
> I am running some Flink experiments with Peel benchmark
> http://peel-framework.org/ and I am struggling with exceptions: the
> environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB
> and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>
> At the beginning I tried with 400 as degree of parallelism but not enough
> numberOfBuffers was raised so I changed the parallelism to 200. Flink
> configuration follows:
>
> jobmanager.rpc.address = ${runtime.hostname}
> akka.log.lifecycle.events = ON
> akka.ask.timeout = 300s
> jobmanager.rpc.port = 6002
> jobmanager.heap.mb = 1024
> jobmanager.web.port = 6004
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
> taskmanager.tmp.dirs =
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> taskmanager.debug.memory.startLogThread = true
>
> With a parallelism of 200 the following exception will raise from a node of
> the cluster:
>
> 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
> at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>
>
> The reduce code is:
>
> 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>
> The map code is:
>
> 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
> 69 dimensionDS.map {
> 70 dimension =>
> 71 val values = DenseVector(Array.fill(dimension)(0.0))
> 72 values
> 73 }
> 74 }
>
> I can't figure out a solution, thank you for your help.
>
> Andrea
>
> --
> Andrea Spina
> N.Tessera: 74598
> MAT: 89369
> Ingegneria Informatica [LM] (D.M. 270)