You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2019/01/17 09:13:00 UTC

[jira] [Commented] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

    [ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744812#comment-16744812 ] 

Till Rohrmann commented on FLINK-11375:
---------------------------------------

Nice catch!

> Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly 
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11375
>                 URL: https://issues.apache.org/jira/browse/FLINK-11375
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.7.1
>            Reporter: shuai.xu
>            Priority: Major
>             Fix For: 1.8.0
>
>
> In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called throw SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException.
>  2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], fields:(f0)) -> correlate: table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), select: item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED.
> 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet exception, need to fail global execution graph
> java.lang.reflect.UndeclaredThrowableException
>  at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
>  at org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
>  at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
>  at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
>  at org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
>  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  at java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
>  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
>  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
>  ... 23 more
> Caused by: java.util.ConcurrentModificationException
>  at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
>  at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>  at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>  at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
>  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
>  at org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findPreviousAllocation(PreviousAllocationSchedulingStrategy.java:77)
>  at org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findMatchWithLocality(PreviousAllocationSchedulingStrategy.java:61)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1755)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1790)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.pollAndAllocateSlots(SlotPool.java:1094)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.requestAllocatedSlots(SlotPool.java:886)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.allocateSlots(SlotPool.java:590)
>  at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)