You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefan Richter (JIRA)" <ji...@apache.org> on 2019/02/21 09:07:00 UTC

[jira] [Resolved] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

     [ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stefan Richter resolved FLINK-11375.
------------------------------------
    Resolution: Fixed

This issue is already fixed as a byproduct of the changes in FLINK-10431.

> Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly 
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11375
>                 URL: https://issues.apache.org/jira/browse/FLINK-11375
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, JobManager
>    Affects Versions: 1.7.1
>            Reporter: shuai.xu
>            Assignee: BoWang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In SlotPool, the AvailableSlots is lock free, so all access to it should in the main thread of SlotPool, and so all the public methods are called through SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. This may cause a ConcurrentModificationException.
>  2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], fields:(f0)) -> correlate: table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), select: item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED.
>  2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet exception, need to fail global execution graph
>  java.lang.reflect.UndeclaredThrowableException
>  at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
>  at org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
>  at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
>  at org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
>  at org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
>  at org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
>  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  at java.lang.Thread.run(Thread.java:834)
>  Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
>  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
>  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
>  ... 23 more
>  Caused by: java.util.ConcurrentModificationException
>  at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
>  at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>  at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>  at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
>  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
>  at org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findPreviousAllocation(PreviousAllocationSchedulingStrategy.java:77)
>  at org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findMatchWithLocality(PreviousAllocationSchedulingStrategy.java:61)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1755)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1790)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.pollAndAllocateSlots(SlotPool.java:1094)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.requestAllocatedSlots(SlotPool.java:886)
>  at org.apache.flink.runtime.jobmaster.slotpool.SlotPool.allocateSlots(SlotPool.java:590)
>  at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)