You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/10 12:14:23 UTC
[flink] branch release-1.6 updated (93c13f4 -> 03dc53c)
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 93c13f4 [FLINK-11207][build] Bump commons-compress to 1.18
new b346f8c [FLINK-11134][rest] Do not log stacktrace for handled exceptions
new a698eec [hotfix][rest] Centralize REST error logging
new 03dc53c [hotfix][rest] Remove unnecessary instanceof check
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../runtime/rest/handler/AbstractHandler.java | 51 +++++++++++++++-------
.../runtime/rest/handler/AbstractRestHandler.java | 24 +---------
.../AbstractTaskManagerFileHandler.java | 31 +++++--------
3 files changed, 47 insertions(+), 59 deletions(-)
[flink] 03/03: [hotfix][rest] Remove unnecessary instanceof check
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 03dc53c1a4a26c31f640a0fe5cc18ebdd8b664d9
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:30:53 2019 +0100
[hotfix][rest] Remove unnecessary instanceof check
---
.../rest/handler/taskmanager/AbstractTaskManagerFileHandler.java | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 1781fe2..01d818b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -124,11 +124,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
blobKeyFuture = fileBlobKeys.get(taskManagerId);
} catch (ExecutionException e) {
final Throwable cause = ExceptionUtils.stripExecutionException(e);
- if (cause instanceof RestHandlerException) {
- throw (RestHandlerException) cause;
- } else {
- throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
- }
+ throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause);
}
final CompletableFuture<Void> resultFuture = blobKeyFuture.thenAcceptAsync(
[flink] 01/03: [FLINK-11134][rest] Do not log stacktrace for
handled exceptions
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b346f8cd5efd16f1609bb0b0ce1917dca3b1ffc3
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 20 14:18:46 2018 +0100
[FLINK-11134][rest] Do not log stacktrace for handled exceptions
---
.../org/apache/flink/runtime/rest/handler/AbstractHandler.java | 9 ++++++---
.../apache/flink/runtime/rest/handler/AbstractRestHandler.java | 6 +++++-
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 5a1c371..3ca0bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,15 +117,13 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
try {
request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
- log.error("Request did not conform to expected format.", je);
- throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
+ throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, je);
}
} else {
try {
ByteBufInputStream in = new ByteBufInputStream(msgContent);
request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
- log.error("Failed to read request.", je);
throw new RestHandlerException(
String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
HttpResponseStatus.BAD_REQUEST,
@@ -165,6 +163,11 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
});
} catch (RestHandlerException rhe) {
inFlightRequestTracker.deregisterRequest();
+ if (log.isDebugEnabled()) {
+ log.error("Exception occurred in REST handler.", rhe);
+ } else {
+ log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+ }
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 0397cb8..3d74a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -89,7 +89,11 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
Throwable error = ExceptionUtils.stripCompletionException(throwable);
if (error instanceof RestHandlerException) {
final RestHandlerException rhe = (RestHandlerException) error;
- log.error("Exception occurred in REST handler.", rhe);
+ if (log.isDebugEnabled()) {
+ log.error("Exception occurred in REST handler.", rhe);
+ } else {
+ log.error("Exception occurred in REST handler: {}", rhe.getMessage());
+ }
return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
} else {
log.error("Implementation error: Unhandled exception.", error);
[flink] 02/03: [hotfix][rest] Centralize REST error logging
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a698eecf1fbf9c4eb74fcbfddca2e356b4f03203
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 3 17:29:56 2019 +0100
[hotfix][rest] Centralize REST error logging
---
.../runtime/rest/handler/AbstractHandler.java | 42 +++++++++++++++-------
.../runtime/rest/handler/AbstractRestHandler.java | 28 +--------------
.../AbstractTaskManagerFileHandler.java | 25 +++++--------
3 files changed, 40 insertions(+), 55 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 3ca0bd3..d46bb13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -50,6 +51,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -158,33 +160,49 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
.whenComplete((Void ignored, Throwable throwable) -> {
- inFlightRequestTracker.deregisterRequest();
- cleanupFileUploads(finalUploadedFiles);
+ if (throwable != null) {
+ handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
+ .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
+ } else {
+ finalizeRequestProcessing(finalUploadedFiles);
+ }
});
- } catch (RestHandlerException rhe) {
- inFlightRequestTracker.deregisterRequest();
+ } catch (Throwable e) {
+ final FileUploads finalUploadedFiles = uploadedFiles;
+ handleException(e, ctx, httpRequest)
+ .whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
+ }
+ }
+
+ private void finalizeRequestProcessing(FileUploads uploadedFiles) {
+ inFlightRequestTracker.deregisterRequest();
+ cleanupFileUploads(uploadedFiles);
+ }
+
+ private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+ if (throwable instanceof RestHandlerException) {
+ RestHandlerException rhe = (RestHandlerException) throwable;
if (log.isDebugEnabled()) {
log.error("Exception occurred in REST handler.", rhe);
} else {
log.error("Exception occurred in REST handler: {}", rhe.getMessage());
}
- HandlerUtils.sendErrorResponse(
+ return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(rhe.getMessage()),
rhe.getHttpResponseStatus(),
responseHeaders);
- cleanupFileUploads(uploadedFiles);
- } catch (Throwable e) {
- inFlightRequestTracker.deregisterRequest();
- log.error("Request processing failed.", e);
- HandlerUtils.sendErrorResponse(
+ } else {
+ log.error("Implementation error: Unhandled exception.", throwable);
+ String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
+ ExceptionUtils.stringifyException(throwable));
+ return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
- new ErrorResponseBody("Internal server error."),
+ new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
- cleanupFileUploads(uploadedFiles);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 3d74a7b..992e2c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,17 +19,14 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -39,7 +36,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import javax.annotation.Nonnull;
-import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -80,29 +76,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
response = FutureUtils.completedExceptionally(e);
}
- return response.handle((resp, throwable) -> throwable != null ?
- errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
- .thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
- }
-
- private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
- Throwable error = ExceptionUtils.stripCompletionException(throwable);
- if (error instanceof RestHandlerException) {
- final RestHandlerException rhe = (RestHandlerException) error;
- if (log.isDebugEnabled()) {
- log.error("Exception occurred in REST handler.", rhe);
- } else {
- log.error("Exception occurred in REST handler: {}", rhe.getMessage());
- }
- return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
- } else {
- log.error("Implementation error: Unhandled exception.", error);
- String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
- ExceptionUtils.stringifyException(throwable));
- return Tuple2.of(
- new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
- HttpResponseStatus.INTERNAL_SERVER_ERROR);
- }
+ return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 8a20868..1781fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,12 +24,11 @@ import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
@@ -75,7 +74,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@@ -160,23 +158,18 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
fileBlobKeys.invalidate(taskManagerId);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- final ErrorResponseBody errorResponseBody;
- final HttpResponseStatus httpResponseStatus;
if (strippedThrowable instanceof UnknownTaskExecutorException) {
- errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
- httpResponseStatus = HttpResponseStatus.NOT_FOUND;
+ throw new CompletionException(
+ new NotFoundException(
+ String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
+ strippedThrowable));
} else {
- errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
- httpResponseStatus = INTERNAL_SERVER_ERROR;
+ throw new CompletionException(
+ new FlinkException(
+ String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
+ strippedThrowable));
}
-
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- errorResponseBody,
- httpResponseStatus,
- responseHeaders);
}
});
}