You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gabriel Nes (Jira)" <ji...@apache.org> on 2021/10/11 13:26:00 UTC

[jira] [Commented] (FLINK-23117) TaskExecutor.allocateSlot is a logical error

    [ https://issues.apache.org/jira/browse/FLINK-23117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427125#comment-17427125 ] 

Gabriel Nes commented on FLINK-23117:
-------------------------------------

It seems that we got this problem with our production environment:

* Flink 1.13.1;
* 1 job manager (without HA);
* 7 task managers with 12 slots each;

The exception stack trace goes like this:
```
org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@51.79.72.124:6122/user/rpc/taskmanager_0 has no more allocated slots for job 2525adf1793959215a2713aeefef13e6.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1941)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1922)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1955)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:181)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2313)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```


The following screenshot shows the Task manager's current state during the error (loads of free slots):
!image-2021-10-11-10-23-14-069.png|width=769,height=441!

> TaskExecutor.allocateSlot is a logical error
> --------------------------------------------
>
>                 Key: FLINK-23117
>                 URL: https://issues.apache.org/jira/browse/FLINK-23117
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.0, 1.12.2, 1.13.0, 1.13.1
>            Reporter: zhouzhengde
>            Priority: Minor
>         Attachments: image-2021-10-11-10-11-28-390.png, image-2021-10-11-10-16-17-977.png, image-2021-10-11-10-23-14-069.png
>
>
> (commit: 2020-04-22)TaskExecutor.allocateSlot at line 1109 has a logical error. Use '!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)' to judge TaskSlot is used by another job that is not correct.  if slot index not occupy, that will be have some problem. Please confirm that is correct. The issue code follow: 
> - TaskExecutor.java
> ```java
> {color:red}} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {{color}
>  final String message =
>  "The slot " + slotId + " has already been allocated for a different job.";
>  log.info(message);
>  final AllocationID allocationID =
>  taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
>  throw new SlotOccupiedException(
>  message, allocationID, taskSlotTable.getOwningJob(allocationID));
> }
> ```
> - TaskSlotTableImpl.java
> ```java
>     @Override
>     public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
>         TaskSlot<T> taskSlot = taskSlots.get(index);
>         if (taskSlot != null) {
>             return taskSlot.isAllocated(jobId, allocationId);
>         } else {
>             return false;
>         }
>     }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)