You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:21 UTC

[flink] 01/03: [FLINK-18663][rest] Improve exception handling

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee4b27f96e8156c6a1c4b825484d238055cecf5f
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 18:56:28 2020 +0800

    [FLINK-18663][rest] Improve exception handling
    
    - ensure that request finalization runs even if handleException throws an exception
    - catch NPE in handleException, which occurs if the client closes the connection
---
 .../flink/runtime/rest/handler/AbstractHandler.java     | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 40094bb..f61cac0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -57,6 +57,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Super class for netty-based handlers that work with {@link RequestBody}.
@@ -177,13 +178,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
+				.handle((Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
+					}
+					return CompletableFuture.<Void>completedFuture(null);
+				}).thenCompose(Function.identity())
 				.whenComplete((Void ignored, Throwable throwable) -> {
 					if (throwable != null) {
-						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
-							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
-					} else {
-						finalizeRequestProcessing(finalUploadedFiles);
+						log.warn("An exception occurred while handling another exception.", throwable);
 					}
+					finalizeRequestProcessing(finalUploadedFiles);
 				});
 		} catch (Throwable e) {
 			final FileUploads finalUploadedFiles = uploadedFiles;
@@ -199,6 +204,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
 		FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
+		if (flinkHttpObjectAggregator == null) {
+			log.warn("The connection was unexpectedly closed by the client.");
+			return CompletableFuture.completedFuture(null);
+		}
 		int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
 		if (throwable instanceof RestHandlerException) {
 			RestHandlerException rhe = (RestHandlerException) throwable;