You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fanbin Bu <fa...@coinbase.com> on 2020/01/08 00:18:31 UTC

managedMemoryInMB failure

Hi,

with Flink 1.9 running in docker mode, I have a batch job and got the
following error message.

However, it works totally fine with the same code on EMR. I checked the log
and here is the only difference:
managedMemoryInMB=138 . (the working ones has 0 value)

did anybody see this before?
Thanks,
Fanbin


org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(
CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
CompletableFuture.java:792)
    at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture
.java:2153)
    at org.apache.flink.runtime.concurrent.FutureUtils
.whenCompleteAsyncIfNotDone(FutureUtils.java:940)
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestSlotFromResourceManager(SlotPoolImpl.java:339)
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.requestNewAllocatedSlot(SchedulerImpl.java:262)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateMultiTaskSlot(SchedulerImpl.java:542)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSharedSlot(SchedulerImpl.java:341)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.internalAllocateSlot(SchedulerImpl.java:168)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateSlotInternal(SchedulerImpl.java:149)
    at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
.allocateBatchSlot(SchedulerImpl.java:129)
    at org.apache.flink.runtime.executiongraph.
SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
SlotProviderStrategy.java:109)
    at org.apache.flink.runtime.executiongraph.Execution
.lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
    at java.util.concurrent.CompletableFuture.uniComposeStage(
CompletableFuture.java:995)
    at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture
.java:2137)
    at org.apache.flink.runtime.executiongraph.Execution
.allocateAndAssignSlotForExecution(Execution.java:554)
    at org.apache.flink.runtime.executiongraph.Execution
.allocateResourcesForExecution(Execution.java:496)
    at org.apache.flink.runtime.executiongraph.Execution
.scheduleForExecution(Execution.java:439)
    at org.apache.flink.runtime.executiongraph.ExecutionVertex
.scheduleForExecution(ExecutionVertex.java:674)
    at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(
Execution.java:850)
    at org.apache.flink.runtime.executiongraph.Execution
.scheduleOrUpdateConsumers(Execution.java:887)
    at org.apache.flink.runtime.executiongraph.Execution.markFinished(
Execution.java:1064)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph
.updateStateInternal(ExecutionGraph.java:1548)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(
ExecutionGraph.java:1521)
    at org.apache.flink.runtime.scheduler.LegacyScheduler
.updateTaskExecutionState(LegacyScheduler.java:289)
    at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:377)
    at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not fulfill slot request c7de65260c8d428b2e295e5afb205242.
    at java.util.concurrent.CompletableFuture.encodeThrowable(
CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(
CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
.java:607)
    at java.util.concurrent.CompletableFuture.uniApplyStage(
CompletableFuture.java:628)
    at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture
.java:1996)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(
AkkaInvocationHandler.java:214)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
AkkaInvocationHandler.java:129)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(
FencedAkkaInvocationHandler.java:78)
    at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
    at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
.requestSlotFromResourceManager(SlotPoolImpl.java:334)
    ... 48 more
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
ResourceManagerException: Could not fulfill slot request
c7de65260c8d428b2e295e5afb205242.
    at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
.registerSlotRequest(SlotManagerImpl.java:315)
    at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(
ResourceManager.java:443)
    at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
    ... 24 more
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
UnfulfillableSlotRequestException: Could not fulfill slot request
c7de65260c8d428b2e295e5afb205242. Requested resource profile (
ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
unfulfillable.
    at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
.internalRequestSlot(SlotManagerImpl.java:768)
    at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
.registerSlotRequest(SlotManagerImpl.java:310)
    ... 26 more

Re: managedMemoryInMB failure

Posted by Xintong Song <to...@gmail.com>.
Hi Fanbin,


> On YARN setups, this value is automatically configured to the size of the
> TaskManager's YARN container, minus a certain tolerance value.
>
If I understand correctly, you are running Flink standalone cluster both in
docker and on EMR? If that is the case, then this sentence has nothing to
do with your case, because it's describing about Yarn deployment.

It should also be irrelevant that you are using a machine with larger
memory on EMR, as long as the "taskmanager.heap.size" are the same. In your
case, I assume for both scenarios the default 1024m is used?

If "taskmanager.memory.size" is not explicitly specified, Flink will
automatically decide the managed memory size. The derived size of managed
memory is depended on the JVM free memory after launching the TM. Flink
will trigger a "System.gc()" after the TM is started, and read the JVM free
heap size after it. I guess the reason decreasing docker cpu cores works
might be that, less cpu cores somehow results in less heap memory
consumption, leaving more free heap memory, thus more managed memory.
AFAIK, there are several places in TM where Flink read the system cpu cores
and decide thread pool sizes accordingly. But this is just my guess and I
cannot confirm it.

I would suggest you to configure "taskmanager.memory.size" explicitly
anyway, to avoid potential problems caused by the uncertainty of JVM free
heap memory size. BTW, this randomness is eliminated in Flink 1.10.

Thank you~

Xintong Song



On Thu, Jan 9, 2020 at 3:04 AM Fanbin Bu <fa...@coinbase.com> wrote:

> Xintong,
>
> Thanks for looking into this. I changed docker setting of #CPUs to a lower
> number and it works now.
> I was using the same code and same flink version. The reason that it works
> on EMR is that I'm using a machine with large memory.
> According to the doc:
> *JVM heap size for the TaskManagers, which are the parallel workers of the
> system. On YARN setups, this value is automatically configured to the size
> of the TaskManager's YARN container, minus a certain tolerance value.*
>
> The default value for JVM heap size is 1024m and I was configuring docker
> to have 6 CPUs and that failed blink batch jobs.
>
> Thanks for your help!
> Fanbin
>
> On Tue, Jan 7, 2020 at 7:51 PM Xintong Song <to...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> The blink planner batch sql operators requires managed memory, and the
>> amount of managed memory needed depends on your job. The failure is because
>> the slot, according to your cluster configurations, does not have enough
>> managed memory to fulfill the requests.
>>
>> To fix the problem, you would need to configure more managed memory for
>> your task executors. You can set the config option
>> "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
>> your case) * numberOfSlots'.
>>
>> It's not clear to me why the exactly same code works on emr. Were you
>> running the same version of flink?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <fa...@coinbase.com> wrote:
>>
>>> Hi,
>>>
>>> with Flink 1.9 running in docker mode, I have a batch job and got the
>>> following error message.
>>>
>>> However, it works totally fine with the same code on EMR. I checked the
>>> log and here is the only difference:
>>> managedMemoryInMB=138 . (the working ones has 0 value)
>>>
>>> did anybody see this before?
>>> Thanks,
>>> Fanbin
>>>
>>>
>>> org.apache.flink.runtime.jobmanager.scheduler.
>>> NoResourceAvailableException: No pooled slot available and request to
>>> ResourceManager for new slot failed
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>>>     at java.util.concurrent.CompletableFuture.uniWhenComplete(
>>> CompletableFuture.java:774)
>>>     at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
>>> CompletableFuture.java:792)
>>>     at java.util.concurrent.CompletableFuture.whenComplete(
>>> CompletableFuture.java:2153)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateSharedSlot(SchedulerImpl.java:341)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .internalAllocateSlot(SchedulerImpl.java:168)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateSlotInternal(SchedulerImpl.java:149)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateBatchSlot(SchedulerImpl.java:129)
>>>     at org.apache.flink.runtime.executiongraph.
>>> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
>>> SlotProviderStrategy.java:109)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>>>     at java.util.concurrent.CompletableFuture.uniComposeStage(
>>> CompletableFuture.java:995)
>>>     at java.util.concurrent.CompletableFuture.thenCompose(
>>> CompletableFuture.java:2137)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .allocateAndAssignSlotForExecution(Execution.java:554)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .allocateResourcesForExecution(Execution.java:496)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleForExecution(Execution.java:439)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionVertex
>>> .scheduleForExecution(ExecutionVertex.java:674)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleConsumer(Execution.java:850)
>>>     at org.apache.flink.runtime.executiongraph.Execution
>>> .scheduleOrUpdateConsumers(Execution.java:887)
>>>     at org.apache.flink.runtime.executiongraph.Execution.markFinished(
>>> Execution.java:1064)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> .updateStateInternal(ExecutionGraph.java:1548)
>>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> .updateState(ExecutionGraph.java:1521)
>>>     at org.apache.flink.runtime.scheduler.LegacyScheduler
>>> .updateTaskExecutionState(LegacyScheduler.java:289)
>>>     at org.apache.flink.runtime.jobmaster.JobMaster
>>> .updateTaskExecutionState(JobMaster.java:377)
>>>     at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>> .handleRpcInvocation(AkkaRpcActor.java:279)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>> AkkaRpcActor.java:194)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>> AkkaRpcActor.java:152)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123
>>> )
>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:
>>> 21)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 170)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>> ForkJoinPool.java:1339)
>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>>> 1979)
>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.resourcemanager.exceptions.
>>> ResourceManagerException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242.
>>>     at java.util.concurrent.CompletableFuture.encodeThrowable(
>>> CompletableFuture.java:292)
>>>     at java.util.concurrent.CompletableFuture.completeThrowable(
>>> CompletableFuture.java:308)
>>>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
>>> .java:607)
>>>     at java.util.concurrent.CompletableFuture.uniApplyStage(
>>> CompletableFuture.java:628)
>>>     at java.util.concurrent.CompletableFuture.thenApply(
>>> CompletableFuture.java:1996)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler
>>> .invokeRpc(AkkaInvocationHandler.java:214)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
>>> AkkaInvocationHandler.java:129)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler
>>> .invoke(FencedAkkaInvocationHandler.java:78)
>>>     at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
>>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestSlotFromResourceManager(SlotPoolImpl.java:334)
>>>     ... 48 more
>>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>>> ResourceManagerException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242.
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315)
>>>     at org.apache.flink.runtime.resourcemanager.ResourceManager
>>> .requestSlot(ResourceManager.java:443)
>>>     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>>>     ... 24 more
>>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>>> UnfulfillableSlotRequestException: Could not fulfill slot request
>>> c7de65260c8d428b2e295e5afb205242. Requested resource profile (
>>> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
>>> unfulfillable.
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768)
>>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310)
>>>     ... 26 more
>>>
>>

Re: managedMemoryInMB failure

Posted by Fanbin Bu <fa...@coinbase.com>.
Xintong,

Thanks for looking into this. I changed docker setting of #CPUs to a lower
number and it works now.
I was using the same code and same flink version. The reason that it works
on EMR is that I'm using a machine with large memory.
According to the doc:
*JVM heap size for the TaskManagers, which are the parallel workers of the
system. On YARN setups, this value is automatically configured to the size
of the TaskManager's YARN container, minus a certain tolerance value.*

The default value for JVM heap size is 1024m and I was configuring docker
to have 6 CPUs and that failed blink batch jobs.

Thanks for your help!
Fanbin

On Tue, Jan 7, 2020 at 7:51 PM Xintong Song <to...@gmail.com> wrote:

> Hi Fanbin,
>
> The blink planner batch sql operators requires managed memory, and the
> amount of managed memory needed depends on your job. The failure is because
> the slot, according to your cluster configurations, does not have enough
> managed memory to fulfill the requests.
>
> To fix the problem, you would need to configure more managed memory for
> your task executors. You can set the config option
> "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
> your case) * numberOfSlots'.
>
> It's not clear to me why the exactly same code works on emr. Were you
> running the same version of flink?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <fa...@coinbase.com> wrote:
>
>> Hi,
>>
>> with Flink 1.9 running in docker mode, I have a batch job and got the
>> following error message.
>>
>> However, it works totally fine with the same code on EMR. I checked the
>> log and here is the only difference:
>> managedMemoryInMB=138 . (the working ones has 0 value)
>>
>> did anybody see this before?
>> Thanks,
>> Fanbin
>>
>>
>> org.apache.flink.runtime.jobmanager.scheduler.
>> NoResourceAvailableException: No pooled slot available and request to
>> ResourceManager for new slot failed
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>>     at java.util.concurrent.CompletableFuture.uniWhenComplete(
>> CompletableFuture.java:774)
>>     at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
>> CompletableFuture.java:792)
>>     at java.util.concurrent.CompletableFuture.whenComplete(
>> CompletableFuture.java:2153)
>>     at org.apache.flink.runtime.concurrent.FutureUtils
>> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateSharedSlot(SchedulerImpl.java:341)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .internalAllocateSlot(SchedulerImpl.java:168)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateSlotInternal(SchedulerImpl.java:149)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateBatchSlot(SchedulerImpl.java:129)
>>     at org.apache.flink.runtime.executiongraph.
>> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
>> SlotProviderStrategy.java:109)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>>     at java.util.concurrent.CompletableFuture.uniComposeStage(
>> CompletableFuture.java:995)
>>     at java.util.concurrent.CompletableFuture.thenCompose(
>> CompletableFuture.java:2137)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .allocateAndAssignSlotForExecution(Execution.java:554)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .allocateResourcesForExecution(Execution.java:496)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleForExecution(Execution.java:439)
>>     at org.apache.flink.runtime.executiongraph.ExecutionVertex
>> .scheduleForExecution(ExecutionVertex.java:674)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleConsumer(Execution.java:850)
>>     at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleOrUpdateConsumers(Execution.java:887)
>>     at org.apache.flink.runtime.executiongraph.Execution.markFinished(
>> Execution.java:1064)
>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>> .updateStateInternal(ExecutionGraph.java:1548)
>>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
>> .updateState(ExecutionGraph.java:1521)
>>     at org.apache.flink.runtime.scheduler.LegacyScheduler
>> .updateTaskExecutionState(LegacyScheduler.java:289)
>>     at org.apache.flink.runtime.jobmaster.JobMaster
>> .updateTaskExecutionState(JobMaster.java:377)
>>     at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>> .handleRpcInvocation(AkkaRpcActor.java:279)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>> AkkaRpcActor.java:194)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>> AkkaRpcActor.java:152)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21
>> )
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170
>> )
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171
>> )
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171
>> )
>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
>> .java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>> 1979)
>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.resourcemanager.exceptions.
>> ResourceManagerException: Could not fulfill slot request
>> c7de65260c8d428b2e295e5afb205242.
>>     at java.util.concurrent.CompletableFuture.encodeThrowable(
>> CompletableFuture.java:292)
>>     at java.util.concurrent.CompletableFuture.completeThrowable(
>> CompletableFuture.java:308)
>>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
>> .java:607)
>>     at java.util.concurrent.CompletableFuture.uniApplyStage(
>> CompletableFuture.java:628)
>>     at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture
>> .java:1996)
>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(
>> AkkaInvocationHandler.java:214)
>>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
>> AkkaInvocationHandler.java:129)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler
>> .invoke(FencedAkkaInvocationHandler.java:78)
>>     at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
>>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestSlotFromResourceManager(SlotPoolImpl.java:334)
>>     ... 48 more
>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>> ResourceManagerException: Could not fulfill slot request
>> c7de65260c8d428b2e295e5afb205242.
>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315)
>>     at org.apache.flink.runtime.resourcemanager.ResourceManager
>> .requestSlot(ResourceManager.java:443)
>>     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>>     ... 24 more
>> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
>> UnfulfillableSlotRequestException: Could not fulfill slot request
>> c7de65260c8d428b2e295e5afb205242. Requested resource profile (
>> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
>> unfulfillable.
>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768)
>>     at org.apache.flink.runtime.resourcemanager.slotmanager.
>> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310)
>>     ... 26 more
>>
>

Re: managedMemoryInMB failure

Posted by Xintong Song <to...@gmail.com>.
Hi Fanbin,

The blink planner batch sql operators requires managed memory, and the
amount of managed memory needed depends on your job. The failure is because
the slot, according to your cluster configurations, does not have enough
managed memory to fulfill the requests.

To fix the problem, you would need to configure more managed memory for
your task executors. You can set the config option
"taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
your case) * numberOfSlots'.

It's not clear to me why the exactly same code works on emr. Were you
running the same version of flink?

Thank you~

Xintong Song



On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <fa...@coinbase.com> wrote:

> Hi,
>
> with Flink 1.9 running in docker mode, I have a batch job and got the
> following error message.
>
> However, it works totally fine with the same code on EMR. I checked the
> log and here is the only difference:
> managedMemoryInMB=138 . (the working ones has 0 value)
>
> did anybody see this before?
> Thanks,
> Fanbin
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot
> failed
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:774)
>     at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
> CompletableFuture.java:792)
>     at java.util.concurrent.CompletableFuture.whenComplete(
> CompletableFuture.java:2153)
>     at org.apache.flink.runtime.concurrent.FutureUtils
> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSharedSlot(SchedulerImpl.java:341)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .internalAllocateSlot(SchedulerImpl.java:168)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlotInternal(SchedulerImpl.java:149)
>     at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateBatchSlot(SchedulerImpl.java:129)
>     at org.apache.flink.runtime.executiongraph.
> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
> SlotProviderStrategy.java:109)
>     at org.apache.flink.runtime.executiongraph.Execution
> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>     at java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:995)
>     at java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2137)
>     at org.apache.flink.runtime.executiongraph.Execution
> .allocateAndAssignSlotForExecution(Execution.java:554)
>     at org.apache.flink.runtime.executiongraph.Execution
> .allocateResourcesForExecution(Execution.java:496)
>     at org.apache.flink.runtime.executiongraph.Execution
> .scheduleForExecution(Execution.java:439)
>     at org.apache.flink.runtime.executiongraph.ExecutionVertex
> .scheduleForExecution(ExecutionVertex.java:674)
>     at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(
> Execution.java:850)
>     at org.apache.flink.runtime.executiongraph.Execution
> .scheduleOrUpdateConsumers(Execution.java:887)
>     at org.apache.flink.runtime.executiongraph.Execution.markFinished(
> Execution.java:1064)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph
> .updateStateInternal(ExecutionGraph.java:1548)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(
> ExecutionGraph.java:1521)
>     at org.apache.flink.runtime.scheduler.LegacyScheduler
> .updateTaskExecutionState(LegacyScheduler.java:289)
>     at org.apache.flink.runtime.jobmaster.JobMaster
> .updateTaskExecutionState(JobMaster.java:377)
>     at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:279)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:194)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> .java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
> 1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.resourcemanager.exceptions.
> ResourceManagerException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242.
>     at java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
>     at java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:308)
>     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture
> .java:607)
>     at java.util.concurrent.CompletableFuture.uniApplyStage(
> CompletableFuture.java:628)
>     at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture
> .java:1996)
>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(
> AkkaInvocationHandler.java:214)
>     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(
> AkkaInvocationHandler.java:129)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler
> .invoke(FencedAkkaInvocationHandler.java:78)
>     at com.sun.proxy.$Proxy8.requestSlot(Unknown Source)
>     at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
> .requestSlotFromResourceManager(SlotPoolImpl.java:334)
>     ... 48 more
> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
> ResourceManagerException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242.
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315)
>     at org.apache.flink.runtime.resourcemanager.ResourceManager
> .requestSlot(ResourceManager.java:443)
>     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>     ... 24 more
> Caused by: org.apache.flink.runtime.resourcemanager.exceptions.
> UnfulfillableSlotRequestException: Could not fulfill slot request
> c7de65260c8d428b2e295e5afb205242. Requested resource profile (
> ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is
> unfulfillable.
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768)
>     at org.apache.flink.runtime.resourcemanager.slotmanager.
> SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310)
>     ... 26 more
>