You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:20:02 UTC

[flink] branch release-1.11 updated (79e33e6 -> 862ac0f)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 79e33e6  [FLINK-16619][coordination] Log reception of slot reports only once
     new 4c2f7c4  [FLINK-18663][rest] Improve exception handling
     new 862ac0f  [FLINK-18663][rest] Exit early if shutdown has started

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rest/handler/AbstractHandler.java      | 27 +++++++++++---
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 53 insertions(+), 15 deletions(-)


[flink] 01/02: [FLINK-18663][rest] Improve exception handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c2f7c44fc7085edfd3f910135fbf7f54d5eadf1
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 18:56:28 2020 +0800

    [FLINK-18663][rest] Improve exception handling
    
    - ensure that request finalization runs even if handleException throws an exception
    - catch NPE in handleException, which occurs if the client closes the connection
---
 .../flink/runtime/rest/handler/AbstractHandler.java     | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 40094bb..f61cac0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -57,6 +57,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Super class for netty-based handlers that work with {@link RequestBody}.
@@ -177,13 +178,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
+				.handle((Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
+					}
+					return CompletableFuture.<Void>completedFuture(null);
+				}).thenCompose(Function.identity())
 				.whenComplete((Void ignored, Throwable throwable) -> {
 					if (throwable != null) {
-						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
-							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
-					} else {
-						finalizeRequestProcessing(finalUploadedFiles);
+						log.warn("An exception occurred while handling another exception.", throwable);
 					}
+					finalizeRequestProcessing(finalUploadedFiles);
 				});
 		} catch (Throwable e) {
 			final FileUploads finalUploadedFiles = uploadedFiles;
@@ -199,6 +204,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
 		FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
+		if (flinkHttpObjectAggregator == null) {
+			log.warn("The connection was unexpectedly closed by the client.");
+			return CompletableFuture.completedFuture(null);
+		}
 		int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
 		if (throwable instanceof RestHandlerException) {
 			RestHandlerException rhe = (RestHandlerException) throwable;


[flink] 02/02: [FLINK-18663][rest] Exit early if shutdown has started

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 862ac0f6921a7cf0a1f7fabfd3cff11292817fb0
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 20:08:14 2020 +0800

    [FLINK-18663][rest] Exit early if shutdown has started
---
 .../runtime/rest/handler/AbstractHandler.java      | 10 +++++-
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index f61cac0..75fd904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,7 +117,15 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		FileUploads uploadedFiles = null;
 		try {
-			inFlightRequestTracker.registerRequest();
+			synchronized (this) {
+				if (terminationFuture != null) {
+					log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+					ctx.channel().close();
+					return;
+				}
+				inFlightRequestTracker.registerRequest();
+			}
+
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c941c45..4352dfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -533,12 +533,11 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	/**
 	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
-	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
-	 * HTTP requests should be served.
+	 * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
+	 * be accepted.
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		testHandler.closeFuture = new CompletableFuture<>();
 		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
@@ -549,16 +548,12 @@ public class RestServerEndpointITCase extends TestLogger {
 			});
 		};
 
-		// Initiate closing RestServerEndpoint but the test handler should block.
-		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
-		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
-
+		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
 		handlerBlocker.awaitRequestToArrive();
 
-		// Allow handler to close but there is still one in-flight request which should prevent
-		// the RestServerEndpoint from closing.
-		testHandler.closeFuture.complete(null);
+		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
@@ -568,6 +563,32 @@ public class RestServerEndpointITCase extends TestLogger {
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
 	}
 
+	/**
+	 * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+	 */
+	@Test
+	public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
+		testHandler.closeFuture = new CompletableFuture<>();
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+
+		// Initiate closing RestServerEndpoint, but the test handler should block
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// attempt to submit a request
+		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+		try {
+			request.get(timeout.getSize(), timeout.getUnit());
+			fail("Request should have failed.");
+		} catch (Exception ignored) {
+			// expected
+		} finally {
+			// allow the endpoint to shut down
+			testHandler.closeFuture.complete(null);
+			closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+		}
+	}
+
 	@Test
 	public void testRestServerBindPort() throws Exception {
 		final int portRangeStart = 52300;