You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tartarus (Jira)" <ji...@apache.org> on 2020/07/21 11:16:00 UTC

[jira] [Updated] (FLINK-18663) Fix Flink On YARN AM not exit

     [ https://issues.apache.org/jira/browse/FLINK-18663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

tartarus updated FLINK-18663:
-----------------------------
    Attachment: C49A7310-F932-451B-A203-6D17F3140C0D.png

> Fix Flink On YARN AM not exit
> -----------------------------
>
>                 Key: FLINK-18663
>                 URL: https://issues.apache.org/jira/browse/FLINK-18663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: tartarus
>            Priority: Major
>         Attachments: C49A7310-F932-451B-A203-6D17F3140C0D.png, e18e00dd6664485c2ff55284fe969474.png
>
>
> AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null
> when rest throw exception, it will do this code
> {code:java}
> private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
> 	FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
> 	int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
> 	if (throwable instanceof RestHandlerException) {
> 		RestHandlerException rhe = (RestHandlerException) throwable;
> 		String stackTrace = ExceptionUtils.stringifyException(rhe);
> 		String truncatedStackTrace = Ascii.truncate(stackTrace, maxLength, "...");
> 		if (log.isDebugEnabled()) {
> 			log.error("Exception occurred in REST handler.", rhe);
> 		} else {
> 			log.error("Exception occurred in REST handler: {}", rhe.getMessage());
> 		}
> 		return HandlerUtils.sendErrorResponse(
> 			ctx,
> 			httpRequest,
> 			new ErrorResponseBody(truncatedStackTrace),
> 			rhe.getHttpResponseStatus(),
> 			responseHeaders);
> 	} else {
> 		log.error("Unhandled exception.", throwable);
> 		String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
> 			ExceptionUtils.stringifyException(throwable));
> 		String truncatedStackTrace = Ascii.truncate(stackTrace, maxLength, "...");
> 		return HandlerUtils.sendErrorResponse(
> 			ctx,
> 			httpRequest,
> 			new ErrorResponseBody(Arrays.asList("Internal server error.", truncatedStackTrace)),
> 			HttpResponseStatus.INTERNAL_SERVER_ERROR,
> 			responseHeaders);
> 	}
> }
> {code}
> but flinkHttpObjectAggregator some case is null,so this will throw NPE,but this method called by  AbstractHandler#respondAsLeader
> {code:java}
> requestProcessingFuture
> 	.whenComplete((Void ignored, Throwable throwable) -> {
> 		if (throwable != null) {
> 			handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
> 				.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
> 		} else {
> 			finalizeRequestProcessing(finalUploadedFiles);
> 		}
> 	});
> {code}
>  the result is InFlightRequestTracker Cannot be cleared.
> so the CompletableFuture does‘t complete that handler's closeAsync returned
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)