You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/10 12:12:14 UTC

[flink] branch master updated (f692d37 -> 4adf23d)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f692d37  [FLINK-11272][flink-yarn] Support for parsing multiple --yarnship parameters
     new c06e7a5  [FLINK-11134][rest] Do not log stacktrace for handled exceptions
     new 1f47c0a  [hotfix][rest] Centralize REST error logging
     new 4adf23d  [hotfix][rest] Remove unnecessary instanceof check

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rest/handler/AbstractHandler.java      | 51 +++++++++++++++-------
 .../runtime/rest/handler/AbstractRestHandler.java  | 24 +---------
 .../AbstractTaskManagerFileHandler.java            | 31 +++++--------
 3 files changed, 47 insertions(+), 59 deletions(-)


[flink] 02/03: [hotfix][rest] Centralize REST error logging

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f47c0ac27ad42d3c4b066914a351e23a6cacbef
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:29:56 2019 +0100

    [hotfix][rest] Centralize REST error logging
---
 .../runtime/rest/handler/AbstractHandler.java      | 42 +++++++++++++++-------
 .../runtime/rest/handler/AbstractRestHandler.java  | 28 +--------------
 .../AbstractTaskManagerFileHandler.java            | 25 +++++--------
 3 files changed, 40 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index a87d3ad..cc9f355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -50,6 +51,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -157,33 +159,49 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
 				.whenComplete((Void ignored, Throwable throwable) -> {
-					inFlightRequestTracker.deregisterRequest();
-					cleanupFileUploads(finalUploadedFiles);
+					if (throwable != null) {
+						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
+							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
+					} else {
+						finalizeRequestProcessing(finalUploadedFiles);
+					}
 				});
-		} catch (RestHandlerException rhe) {
-			inFlightRequestTracker.deregisterRequest();
+		} catch (Throwable e) {
+			final FileUploads finalUploadedFiles = uploadedFiles;
+			handleException(e, ctx, httpRequest)
+				.whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
+		}
+	}
+
+	private void finalizeRequestProcessing(FileUploads uploadedFiles) {
+		inFlightRequestTracker.deregisterRequest();
+		cleanupFileUploads(uploadedFiles);
+	}
+
+	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+		if (throwable instanceof RestHandlerException) {
+			RestHandlerException rhe = (RestHandlerException) throwable;
 			if (log.isDebugEnabled()) {
 				log.error("Exception occurred in REST handler.", rhe);
 			} else {
 				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
 			}
-			HandlerUtils.sendErrorResponse(
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
 				new ErrorResponseBody(rhe.getMessage()),
 				rhe.getHttpResponseStatus(),
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
-		} catch (Throwable e) {
-			inFlightRequestTracker.deregisterRequest();
-			log.error("Request processing failed.", e);
-			HandlerUtils.sendErrorResponse(
+		} else {
+			log.error("Implementation error: Unhandled exception.", throwable);
+			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
+				ExceptionUtils.stringifyException(throwable));
+			return HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
-				new ErrorResponseBody("Internal server error."),
+				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
-			cleanupFileUploads(uploadedFiles);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index d88abaa..88a0c5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,17 +19,14 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -39,7 +36,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
-import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -79,29 +75,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		return response.handle((resp, throwable) -> throwable != null ?
-			errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
-			.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
-	}
-
-	private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
-		Throwable error = ExceptionUtils.stripCompletionException(throwable);
-		if (error instanceof RestHandlerException) {
-			final RestHandlerException rhe = (RestHandlerException) error;
-			if (log.isDebugEnabled()) {
-				log.error("Exception occurred in REST handler.", rhe);
-			} else {
-				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
-			}
-			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
-		} else {
-			log.error("Implementation error: Unhandled exception.", error);
-			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
-				ExceptionUtils.stringifyException(throwable));
-			return Tuple2.of(
-				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
-				HttpResponseStatus.INTERNAL_SERVER_ERROR);
-		}
+		return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 82ca82c..c2d7a28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,12 +24,11 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
@@ -75,7 +74,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
@@ -159,23 +157,18 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 					fileBlobKeys.invalidate(taskManagerId);
 
 					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-					final ErrorResponseBody errorResponseBody;
-					final HttpResponseStatus httpResponseStatus;
 
 					if (strippedThrowable instanceof UnknownTaskExecutorException) {
-						errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
-						httpResponseStatus = HttpResponseStatus.NOT_FOUND;
+						throw new CompletionException(
+							new NotFoundException(
+								String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
+								strippedThrowable));
 					} else {
-						errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
-						httpResponseStatus = INTERNAL_SERVER_ERROR;
+						throw new CompletionException(
+							new FlinkException(
+								String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
+								strippedThrowable));
 					}
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						errorResponseBody,
-						httpResponseStatus,
-						responseHeaders);
 				}
 			});
 	}


[flink] 01/03: [FLINK-11134][rest] Do not log stacktrace for handled exceptions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c06e7a5427675efb63aff86debd03581343cfdb7
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 20 14:18:46 2018 +0100

    [FLINK-11134][rest] Do not log stacktrace for handled exceptions
---
 .../org/apache/flink/runtime/rest/handler/AbstractHandler.java   | 9 ++++++---
 .../apache/flink/runtime/rest/handler/AbstractRestHandler.java   | 6 +++++-
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 865bdb1..a87d3ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -116,15 +116,13 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				try {
 					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Request did not conform to expected format.", je);
-					throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
+					throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, je);
 				}
 			} else {
 				try {
 					ByteBufInputStream in = new ByteBufInputStream(msgContent);
 					request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Failed to read request.", je);
 					throw new RestHandlerException(
 						String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
 						HttpResponseStatus.BAD_REQUEST,
@@ -164,6 +162,11 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				});
 		} catch (RestHandlerException rhe) {
 			inFlightRequestTracker.deregisterRequest();
+			if (log.isDebugEnabled()) {
+				log.error("Exception occurred in REST handler.", rhe);
+			} else {
+				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+			}
 			HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index a0cff90..d88abaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -88,7 +88,11 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 		Throwable error = ExceptionUtils.stripCompletionException(throwable);
 		if (error instanceof RestHandlerException) {
 			final RestHandlerException rhe = (RestHandlerException) error;
-			log.error("Exception occurred in REST handler.", rhe);
+			if (log.isDebugEnabled()) {
+				log.error("Exception occurred in REST handler.", rhe);
+			} else {
+				log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+			}
 			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
 		} else {
 			log.error("Implementation error: Unhandled exception.", error);


[flink] 03/03: [hotfix][rest] Remove unnecessary instanceof check

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4adf23d3b93c301b53f11f1c05a1aa6d37112d46
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:30:53 2019 +0100

    [hotfix][rest] Remove unnecessary instanceof check
---
 .../rest/handler/taskmanager/AbstractTaskManagerFileHandler.java    | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index c2d7a28..17d1b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -123,11 +123,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 			blobKeyFuture = fileBlobKeys.get(taskManagerId);
 		} catch (ExecutionException e) {
 			final Throwable cause = ExceptionUtils.stripExecutionException(e);
-			if (cause instanceof RestHandlerException) {
-				throw (RestHandlerException) cause;
-			} else {
-				throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
-			}
+			throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause);
 		}
 
 		final CompletableFuture<Void> resultFuture = blobKeyFuture.thenAcceptAsync(