You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:23:30 UTC

[flink] branch release-1.10 updated (7627f54 -> 52a77e6)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7627f54  [FLINK-16619][coordination] Log reception of slot reports only once
     new bda13836 [FLINK-18663][rest] Improve exception handling
     new 52a77e6  [FLINK-18663][rest] Exit early if shutdown has started

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rest/handler/AbstractHandler.java      | 27 +++++++++++---
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 53 insertions(+), 15 deletions(-)


[flink] 02/02: [FLINK-18663][rest] Exit early if shutdown has started

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52a77e695ea9dea79f67f6b6a7281ceb1079f65e
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 20:08:14 2020 +0800

    [FLINK-18663][rest] Exit early if shutdown has started
---
 .../runtime/rest/handler/AbstractHandler.java      | 10 +++++-
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index f61cac0..75fd904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,7 +117,15 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		FileUploads uploadedFiles = null;
 		try {
-			inFlightRequestTracker.registerRequest();
+			synchronized (this) {
+				if (terminationFuture != null) {
+					log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+					ctx.channel().close();
+					return;
+				}
+				inFlightRequestTracker.registerRequest();
+			}
+
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 0a95a3c..2e70a5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -533,12 +533,11 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	/**
 	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
-	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
-	 * HTTP requests should be served.
+	 * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
+	 * be accepted.
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		testHandler.closeFuture = new CompletableFuture<>();
 		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
@@ -549,16 +548,12 @@ public class RestServerEndpointITCase extends TestLogger {
 			});
 		};
 
-		// Initiate closing RestServerEndpoint but the test handler should block.
-		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
-		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
-
+		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
 		handlerBlocker.awaitRequestToArrive();
 
-		// Allow handler to close but there is still one in-flight request which should prevent
-		// the RestServerEndpoint from closing.
-		testHandler.closeFuture.complete(null);
+		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
@@ -568,6 +563,32 @@ public class RestServerEndpointITCase extends TestLogger {
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
 	}
 
+	/**
+	 * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+	 */
+	@Test
+	public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
+		testHandler.closeFuture = new CompletableFuture<>();
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+
+		// Initiate closing RestServerEndpoint, but the test handler should block
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// attempt to submit a request
+		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+		try {
+			request.get(timeout.getSize(), timeout.getUnit());
+			fail("Request should have failed.");
+		} catch (Exception ignored) {
+			// expected
+		} finally {
+			// allow the endpoint to shut down
+			testHandler.closeFuture.complete(null);
+			closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+		}
+	}
+
 	@Test
 	public void testRestServerBindPort() throws Exception {
 		final int portRangeStart = 52300;


[flink] 01/02: [FLINK-18663][rest] Improve exception handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bda138361e55437c84ea000fe38a6bce69cf7520
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 18:56:28 2020 +0800

    [FLINK-18663][rest] Improve exception handling
    
    - ensure that request finalization runs even if handleException throws an exception
    - catch NPE in handleException, which occurs if the client closes the connection
---
 .../flink/runtime/rest/handler/AbstractHandler.java     | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 40094bb..f61cac0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -57,6 +57,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Super class for netty-based handlers that work with {@link RequestBody}.
@@ -177,13 +178,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
+				.handle((Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
+					}
+					return CompletableFuture.<Void>completedFuture(null);
+				}).thenCompose(Function.identity())
 				.whenComplete((Void ignored, Throwable throwable) -> {
 					if (throwable != null) {
-						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
-							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
-					} else {
-						finalizeRequestProcessing(finalUploadedFiles);
+						log.warn("An exception occurred while handling another exception.", throwable);
 					}
+					finalizeRequestProcessing(finalUploadedFiles);
 				});
 		} catch (Throwable e) {
 			final FileUploads finalUploadedFiles = uploadedFiles;
@@ -199,6 +204,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
 		FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
+		if (flinkHttpObjectAggregator == null) {
+			log.warn("The connection was unexpectedly closed by the client.");
+			return CompletableFuture.completedFuture(null);
+		}
 		int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
 		if (throwable instanceof RestHandlerException) {
 			RestHandlerException rhe = (RestHandlerException) throwable;