You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/09/30 14:16:52 UTC

[GitHub] [flink] zhuzhurk commented on a change in pull request #9794: [FLINK-14247][runtime] Use NoResourceAvailableException to wrap TimeoutException on slot allocation timeout

zhuzhurk commented on a change in pull request #9794: [FLINK-14247][runtime] Use NoResourceAvailableException to wrap TimeoutException on slot allocation timeout
URL: https://github.com/apache/flink/pull/9794#discussion_r329601655
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##########
 @@ -362,12 +365,22 @@ private static void propagateIfNonNull(final Throwable throwable) {
 					.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
 				executionVertex.tryAssignResource(logicalSlot);
 			} else {
-				handleTaskFailure(executionVertexId, throwable);
+				handleTaskFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(throwable));
 			}
 			return null;
 		};
 	}
 
+	private Throwable maybeWrapWithNoResourceAvailableException(final Throwable failure) {
+		final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
+		if (strippedThrowable instanceof TimeoutException) {
+			return new NoResourceAvailableException("Could not allocate the required slot within slot request timeout. " +
 
 Review comment:
   I had thought to do this.
   But since the cause would always be java.util.concurrent.TimeoutException, and its call stack does not make much sense to users, I think we can drop it.
   
   e.g.
   java.util.concurrent.TimeoutException
   	at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:991)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services