You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/10 12:12:16 UTC

[flink] 02/03: [hotfix][rest] Centralize REST error logging

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f47c0ac27ad42d3c4b066914a351e23a6cacbef
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:29:56 2019 +0100

    [hotfix][rest] Centralize REST error logging
---
 .../runtime/rest/handler/AbstractHandler.java      | 42 +++++++++++++++-------
 .../runtime/rest/handler/AbstractRestHandler.java  | 28 +--------------
 .../AbstractTaskManagerFileHandler.java            | 25 +++++--------
 3 files changed, 40 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index a87d3ad..cc9f355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -50,6 +51,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -157,33 +159,49 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
 				.whenComplete((Void ignored, Throwable throwable) -> {
-					inFlightRequestTracker.deregisterRequest();
-					cleanupFileUploads(finalUploadedFiles);
+					if (throwable != null) {
+						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
+							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
+					} else {
+						finalizeRequestProcessing(finalUploadedFiles);
+					}
 				});
-		} catch (RestHandlerException rhe) {
-			inFlightRequestTracker.deregisterRequest();
+		} catch (Throwable e) {
+			final FileUploads finalUploadedFiles = uploadedFiles;
+			handleException(e, ctx, httpRequest)
+				.whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
+		}
+	}
+
+	private void finalizeRequestProcessing(FileUploads uploadedFiles) {
+		inFlightRequestTracker.deregisterRequest();
+		cleanupFileUploads(uploadedFiles);
+	}
+
+	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+		if (throwable instanceof RestHandlerException) {
+			RestHandlerException rhe = (RestHandlerException) throwable;
 			if (log.isDebugEnabled()) {
 				log.error("Exception occurred in REST handler.", rhe);
 			} else {
 				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
 			}
-			HandlerUtils.sendErrorResponse(
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
 				new ErrorResponseBody(rhe.getMessage()),
 				rhe.getHttpResponseStatus(),
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
-		} catch (Throwable e) {
-			inFlightRequestTracker.deregisterRequest();
-			log.error("Request processing failed.", e);
-			HandlerUtils.sendErrorResponse(
+		} else {
+			log.error("Implementation error: Unhandled exception.", throwable);
+			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
+				ExceptionUtils.stringifyException(throwable));
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
-				new ErrorResponseBody("Internal server error."),
+				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index d88abaa..88a0c5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,17 +19,14 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -39,7 +36,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
-import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -79,29 +75,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		return response.handle((resp, throwable) -> throwable != null ?
-			errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
-			.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
-	}
-
-	private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
-		Throwable error = ExceptionUtils.stripCompletionException(throwable);
-		if (error instanceof RestHandlerException) {
-			final RestHandlerException rhe = (RestHandlerException) error;
-			if (log.isDebugEnabled()) {
-				log.error("Exception occurred in REST handler.", rhe);
-			} else {
-				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
-			}
-			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
-		} else {
-			log.error("Implementation error: Unhandled exception.", error);
-			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
-				ExceptionUtils.stringifyException(throwable));
-			return Tuple2.of(
-				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
-				HttpResponseStatus.INTERNAL_SERVER_ERROR);
-		}
+		return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 82ca82c..c2d7a28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,12 +24,11 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
@@ -75,7 +74,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
@@ -159,23 +157,18 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 					fileBlobKeys.invalidate(taskManagerId);
 
 					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-					final ErrorResponseBody errorResponseBody;
-					final HttpResponseStatus httpResponseStatus;
 
 					if (strippedThrowable instanceof UnknownTaskExecutorException) {
-						errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
-						httpResponseStatus = HttpResponseStatus.NOT_FOUND;
+						throw new CompletionException(
+							new NotFoundException(
+								String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
+								strippedThrowable));
 					} else {
-						errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
-						httpResponseStatus = INTERNAL_SERVER_ERROR;
+						throw new CompletionException(
+							new FlinkException(
+								String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
+								strippedThrowable));
 					}
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						errorResponseBody,
-						httpResponseStatus,
-						responseHeaders);
 				}
 			});
 	}