You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/10 12:14:23 UTC

[flink] branch release-1.6 updated (93c13f4 -> 03dc53c)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 93c13f4  [FLINK-11207][build] Bump commons-compress to 1.18
     new b346f8c  [FLINK-11134][rest] Do not log stacktrace for handled exceptions
     new a698eec  [hotfix][rest] Centralize REST error logging
     new 03dc53c  [hotfix][rest] Remove unnecessary instanceof check

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rest/handler/AbstractHandler.java      | 51 +++++++++++++++-------
 .../runtime/rest/handler/AbstractRestHandler.java  | 24 +---------
 .../AbstractTaskManagerFileHandler.java            | 31 +++++--------
 3 files changed, 47 insertions(+), 59 deletions(-)


[flink] 03/03: [hotfix][rest] Remove unnecessary instanceof check

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03dc53c1a4a26c31f640a0fe5cc18ebdd8b664d9
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:30:53 2019 +0100

    [hotfix][rest] Remove unnecessary instanceof check
---
 .../rest/handler/taskmanager/AbstractTaskManagerFileHandler.java    | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 1781fe2..01d818b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -124,11 +124,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 			blobKeyFuture = fileBlobKeys.get(taskManagerId);
 		} catch (ExecutionException e) {
 			final Throwable cause = ExceptionUtils.stripExecutionException(e);
-			if (cause instanceof RestHandlerException) {
-				throw (RestHandlerException) cause;
-			} else {
-				throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
-			}
+			throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause);
 		}
 
 		final CompletableFuture<Void> resultFuture = blobKeyFuture.thenAcceptAsync(


[flink] 01/03: [FLINK-11134][rest] Do not log stacktrace for handled exceptions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b346f8cd5efd16f1609bb0b0ce1917dca3b1ffc3
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 20 14:18:46 2018 +0100

    [FLINK-11134][rest] Do not log stacktrace for handled exceptions
---
 .../org/apache/flink/runtime/rest/handler/AbstractHandler.java   | 9 ++++++---
 .../apache/flink/runtime/rest/handler/AbstractRestHandler.java   | 6 +++++-
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 5a1c371..3ca0bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,15 +117,13 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				try {
 					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Request did not conform to expected format.", je);
-					throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
+					throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, je);
 				}
 			} else {
 				try {
 					ByteBufInputStream in = new ByteBufInputStream(msgContent);
 					request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Failed to read request.", je);
 					throw new RestHandlerException(
 						String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
 						HttpResponseStatus.BAD_REQUEST,
@@ -165,6 +163,11 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				});
 		} catch (RestHandlerException rhe) {
 			inFlightRequestTracker.deregisterRequest();
+			if (log.isDebugEnabled()) {
+				log.error("Exception occurred in REST handler.", rhe);
+			} else {
+				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+			}
 			HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 0397cb8..3d74a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -89,7 +89,11 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 		Throwable error = ExceptionUtils.stripCompletionException(throwable);
 		if (error instanceof RestHandlerException) {
 			final RestHandlerException rhe = (RestHandlerException) error;
-			log.error("Exception occurred in REST handler.", rhe);
+			if (log.isDebugEnabled()) {
+				log.error("Exception occurred in REST handler.", rhe);
+			} else {
+				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+			}
 			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
 		} else {
 			log.error("Implementation error: Unhandled exception.", error);


[flink] 02/03: [hotfix][rest] Centralize REST error logging

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a698eecf1fbf9c4eb74fcbfddca2e356b4f03203
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:29:56 2019 +0100

    [hotfix][rest] Centralize REST error logging
---
 .../runtime/rest/handler/AbstractHandler.java      | 42 +++++++++++++++-------
 .../runtime/rest/handler/AbstractRestHandler.java  | 28 +--------------
 .../AbstractTaskManagerFileHandler.java            | 25 +++++--------
 3 files changed, 40 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 3ca0bd3..d46bb13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -50,6 +51,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -158,33 +160,49 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
 				.whenComplete((Void ignored, Throwable throwable) -> {
-					inFlightRequestTracker.deregisterRequest();
-					cleanupFileUploads(finalUploadedFiles);
+					if (throwable != null) {
+						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
+							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
+					} else {
+						finalizeRequestProcessing(finalUploadedFiles);
+					}
 				});
-		} catch (RestHandlerException rhe) {
-			inFlightRequestTracker.deregisterRequest();
+		} catch (Throwable e) {
+			final FileUploads finalUploadedFiles = uploadedFiles;
+			handleException(e, ctx, httpRequest)
+				.whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
+		}
+	}
+
+	private void finalizeRequestProcessing(FileUploads uploadedFiles) {
+		inFlightRequestTracker.deregisterRequest();
+		cleanupFileUploads(uploadedFiles);
+	}
+
+	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+		if (throwable instanceof RestHandlerException) {
+			RestHandlerException rhe = (RestHandlerException) throwable;
 			if (log.isDebugEnabled()) {
 				log.error("Exception occurred in REST handler.", rhe);
 			} else {
 				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
 			}
-			HandlerUtils.sendErrorResponse(
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
 				new ErrorResponseBody(rhe.getMessage()),
 				rhe.getHttpResponseStatus(),
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
-		} catch (Throwable e) {
-			inFlightRequestTracker.deregisterRequest();
-			log.error("Request processing failed.", e);
-			HandlerUtils.sendErrorResponse(
+		} else {
+			log.error("Implementation error: Unhandled exception.", throwable);
+			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
+				ExceptionUtils.stringifyException(throwable));
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
-				new ErrorResponseBody("Internal server error."),
+				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 3d74a7b..992e2c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,17 +19,14 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -39,7 +36,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
-import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -80,29 +76,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		return response.handle((resp, throwable) -> throwable != null ?
-			errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
-			.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
-	}
-
-	private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
-		Throwable error = ExceptionUtils.stripCompletionException(throwable);
-		if (error instanceof RestHandlerException) {
-			final RestHandlerException rhe = (RestHandlerException) error;
-			if (log.isDebugEnabled()) {
-				log.error("Exception occurred in REST handler.", rhe);
-			} else {
-				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
-			}
-			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
-		} else {
-			log.error("Implementation error: Unhandled exception.", error);
-			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
-				ExceptionUtils.stringifyException(throwable));
-			return Tuple2.of(
-				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
-				HttpResponseStatus.INTERNAL_SERVER_ERROR);
-		}
+		return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 8a20868..1781fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,12 +24,11 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
@@ -75,7 +74,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
@@ -160,23 +158,18 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 					fileBlobKeys.invalidate(taskManagerId);
 
 					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-					final ErrorResponseBody errorResponseBody;
-					final HttpResponseStatus httpResponseStatus;
 
 					if (strippedThrowable instanceof UnknownTaskExecutorException) {
-						errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
-						httpResponseStatus = HttpResponseStatus.NOT_FOUND;
+						throw new CompletionException(
+							new NotFoundException(
+								String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
+								strippedThrowable));
 					} else {
-						errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
-						httpResponseStatus = INTERNAL_SERVER_ERROR;
+						throw new CompletionException(
+							new FlinkException(
+								String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
+								strippedThrowable));
 					}
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						errorResponseBody,
-						httpResponseStatus,
-						responseHeaders);
 				}
 			});
 	}