You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangminglei (Jira)" <ji...@apache.org> on 2020/05/25 09:25:00 UTC

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

     [ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhangminglei updated FLINK-17921:
---------------------------------
    Summary: RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout  (was: RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout)

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-17921
>                 URL: https://issues.apache.org/jira/browse/FLINK-17921
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: zhangminglei
>            Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, the exception is still in there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout information can truly reflect the current status of the program, {{akka.time.out}} error is too wide, which is not conducive to user troubleshooting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)