You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/10 12:12:16 UTC
[flink] 02/03: [hotfix][rest] Centralize REST error logging
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f47c0ac27ad42d3c4b066914a351e23a6cacbef
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:29:56 2019 +0100
[hotfix][rest] Centralize REST error logging
---
.../runtime/rest/handler/AbstractHandler.java | 42 +++++++++++++++-------
.../runtime/rest/handler/AbstractRestHandler.java | 28 +--------------
.../AbstractTaskManagerFileHandler.java | 25 +++++--------
3 files changed, 40 insertions(+), 55 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index a87d3ad..cc9f355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -50,6 +51,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -157,33 +159,49 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
.whenComplete((Void ignored, Throwable throwable) -> {
- inFlightRequestTracker.deregisterRequest();
- cleanupFileUploads(finalUploadedFiles);
+ if (throwable != null) {
+ handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
+ .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
+ } else {
+ finalizeRequestProcessing(finalUploadedFiles);
+ }
});
- } catch (RestHandlerException rhe) {
- inFlightRequestTracker.deregisterRequest();
+ } catch (Throwable e) {
+ final FileUploads finalUploadedFiles = uploadedFiles;
+ handleException(e, ctx, httpRequest)
+ .whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
+ }
+ }
+
+ private void finalizeRequestProcessing(FileUploads uploadedFiles) {
+ inFlightRequestTracker.deregisterRequest();
+ cleanupFileUploads(uploadedFiles);
+ }
+
+ private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+ if (throwable instanceof RestHandlerException) {
+ RestHandlerException rhe = (RestHandlerException) throwable;
if (log.isDebugEnabled()) {
log.error("Exception occurred in REST handler.", rhe);
} else {
log.error("Exception occurred in REST handler: {}", rhe.getMessage());
}
- HandlerUtils.sendErrorResponse(
+ return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(rhe.getMessage()),
rhe.getHttpResponseStatus(),
responseHeaders);
- cleanupFileUploads(uploadedFiles);
- } catch (Throwable e) {
- inFlightRequestTracker.deregisterRequest();
- log.error("Request processing failed.", e);
- HandlerUtils.sendErrorResponse(
+ } else {
+ log.error("Implementation error: Unhandled exception.", throwable);
+ String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
+ ExceptionUtils.stringifyException(throwable));
+ return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
- new ErrorResponseBody("Internal server error."),
+ new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
- cleanupFileUploads(uploadedFiles);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index d88abaa..88a0c5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,17 +19,14 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -39,7 +36,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import javax.annotation.Nonnull;
-import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -79,29 +75,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
response = FutureUtils.completedExceptionally(e);
}
- return response.handle((resp, throwable) -> throwable != null ?
- errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
- .thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
- }
-
- private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
- Throwable error = ExceptionUtils.stripCompletionException(throwable);
- if (error instanceof RestHandlerException) {
- final RestHandlerException rhe = (RestHandlerException) error;
- if (log.isDebugEnabled()) {
- log.error("Exception occurred in REST handler.", rhe);
- } else {
- log.error("Exception occurred in REST handler: {}", rhe.getMessage());
- }
- return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
- } else {
- log.error("Implementation error: Unhandled exception.", error);
- String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
- ExceptionUtils.stringifyException(throwable));
- return Tuple2.of(
- new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
- HttpResponseStatus.INTERNAL_SERVER_ERROR);
- }
+ return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 82ca82c..c2d7a28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,12 +24,11 @@ import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
@@ -75,7 +74,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@@ -159,23 +157,18 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
fileBlobKeys.invalidate(taskManagerId);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- final ErrorResponseBody errorResponseBody;
- final HttpResponseStatus httpResponseStatus;
if (strippedThrowable instanceof UnknownTaskExecutorException) {
- errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
- httpResponseStatus = HttpResponseStatus.NOT_FOUND;
+ throw new CompletionException(
+ new NotFoundException(
+ String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
+ strippedThrowable));
} else {
- errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
- httpResponseStatus = INTERNAL_SERVER_ERROR;
+ throw new CompletionException(
+ new FlinkException(
+ String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
+ strippedThrowable));
}
-
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- errorResponseBody,
- httpResponseStatus,
- responseHeaders);
}
});
}