You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:20:03 UTC
[flink] 01/02: [FLINK-18663][rest] Improve exception handling
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4c2f7c44fc7085edfd3f910135fbf7f54d5eadf1
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 18:56:28 2020 +0800
[FLINK-18663][rest] Improve exception handling
- ensure that request finalization runs even if handleException throws an exception
- catch NPE in handleException, which occurs if the client closes the connection
---
.../flink/runtime/rest/handler/AbstractHandler.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 40094bb..f61cac0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -57,6 +57,7 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
/**
* Super class for netty-based handlers that work with {@link RequestBody}.
@@ -177,13 +178,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
+ .handle((Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
+ }
+ return CompletableFuture.<Void>completedFuture(null);
+ }).thenCompose(Function.identity())
.whenComplete((Void ignored, Throwable throwable) -> {
if (throwable != null) {
- handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
- .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
- } else {
- finalizeRequestProcessing(finalUploadedFiles);
+ log.warn("An exception occurred while handling another exception.", throwable);
}
+ finalizeRequestProcessing(finalUploadedFiles);
});
} catch (Throwable e) {
final FileUploads finalUploadedFiles = uploadedFiles;
@@ -199,6 +204,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
+ if (flinkHttpObjectAggregator == null) {
+ log.warn("The connection was unexpectedly closed by the client.");
+ return CompletableFuture.completedFuture(null);
+ }
int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
if (throwable instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) throwable;