You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:22 UTC

[flink] 02/03: [FLINK-18663][rest] Exit early if shutdown has started

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7619db019181347a16fd37f0e4e04c420f75eee9
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 20:08:14 2020 +0800

    [FLINK-18663][rest] Exit early if shutdown has started
---
 .../runtime/rest/handler/AbstractHandler.java      | 10 +++++-
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index f61cac0..75fd904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,7 +117,15 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		FileUploads uploadedFiles = null;
 		try {
-			inFlightRequestTracker.registerRequest();
+			synchronized (this) {
+				if (terminationFuture != null) {
+					log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+					ctx.channel().close();
+					return;
+				}
+				inFlightRequestTracker.registerRequest();
+			}
+
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c941c45..4352dfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -533,12 +533,11 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	/**
 	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
-	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
-	 * HTTP requests should be served.
+	 * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
+	 * be accepted.
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		testHandler.closeFuture = new CompletableFuture<>();
 		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
@@ -549,16 +548,12 @@ public class RestServerEndpointITCase extends TestLogger {
 			});
 		};
 
-		// Initiate closing RestServerEndpoint but the test handler should block.
-		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
-		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
-
+		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
 		handlerBlocker.awaitRequestToArrive();
 
-		// Allow handler to close but there is still one in-flight request which should prevent
-		// the RestServerEndpoint from closing.
-		testHandler.closeFuture.complete(null);
+		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
@@ -568,6 +563,32 @@ public class RestServerEndpointITCase extends TestLogger {
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
 	}
 
+	/**
+	 * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+	 */
+	@Test
+	public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
+		testHandler.closeFuture = new CompletableFuture<>();
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+
+		// Initiate closing RestServerEndpoint, but the test handler should block
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// attempt to submit a request
+		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+		try {
+			request.get(timeout.getSize(), timeout.getUnit());
+			fail("Request should have failed.");
+		} catch (Exception ignored) {
+			// expected
+		} finally {
+			// allow the endpoint to shut down
+			testHandler.closeFuture.complete(null);
+			closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+		}
+	}
+
 	@Test
 	public void testRestServerBindPort() throws Exception {
 		final int portRangeStart = 52300;