You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/13 08:18:05 UTC

[flink] 01/02: [FLINK-18902][rest] Allow request serving while the REST handlers are shutting down

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e975a49717a7122ab2394397726e5c8fb1c047b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Aug 12 16:50:52 2020 +0200

    [FLINK-18902][rest] Allow request serving while the REST handlers are shutting down
    
    In order to serve the job execution result when using the per job mode, the REST
    handlers must be able to serve requests while the RestServerEndpoint is being shut
    down. Otherwise, it is not possible to serve the asynchronous operation results
    such as the job execution result.
    
    This commit solves this problem by checking whether the InFlightRequestTracker
    allows to enqueue a new request or not in the AbstractHandler. If it rejects a new
    request, this means that the handler has been shut completely shut down and that we
    should close the connection to the client. If the InFlightRequestTracker still accepts
    requests, then this means that the RestHandler might still want to serve an asynchronous
    result.
    
    This closes #13132.
---
 .../runtime/rest/handler/AbstractHandler.java      | 11 ++---
 .../rest/handler/InFlightRequestTracker.java       |  7 ++-
 .../runtime/rest/RestServerEndpointITCase.java     | 56 +++++++++++++++-------
 .../rest/handler/InFlightRequestTrackerTest.java   |  8 ++++
 4 files changed, 57 insertions(+), 25 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 75fd904..09ff055 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,13 +117,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		FileUploads uploadedFiles = null;
 		try {
-			synchronized (this) {
-				if (terminationFuture != null) {
-					log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
-					ctx.channel().close();
-					return;
-				}
-				inFlightRequestTracker.registerRequest();
+			if (!inFlightRequestTracker.registerRequest()) {
+				log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+				ctx.channel().close();
+				return;
 			}
 
 			if (!(httpRequest instanceof FullHttpRequest)) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
index 92478b1..469debd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
@@ -43,9 +43,12 @@ class InFlightRequestTracker {
 
 	/**
 	 * Registers an in-flight request.
+	 *
+	 * @return {@code true} if the request could be registered; {@code false} if the tracker has already
+	 * been terminated.
 	 */
-	public void registerRequest() {
-		phaser.register();
+	public boolean registerRequest() {
+		return phaser.register() >= 0;
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 4352dfa..0c6fe66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -533,12 +534,13 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	/**
 	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
-	 * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
-	 * be accepted.
+	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
+	 * HTTP requests should be served.
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
 		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		testHandler.closeFuture = new CompletableFuture<>();
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
 			// handlers where the CompletableFuture is finished by the RPC framework.
@@ -548,12 +550,17 @@ public class RestServerEndpointITCase extends TestLogger {
 			});
 		};
 
+		// Initiate closing RestServerEndpoint but the test handler should block.
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
 		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
 		handlerBlocker.awaitRequestToArrive();
 
-		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
-		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		// Allow handler to close but there is still one in-flight request which should prevent
+		// the RestServerEndpoint from closing.
+		testHandler.closeFuture.complete(null);
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
@@ -564,28 +571,35 @@ public class RestServerEndpointITCase extends TestLogger {
 	}
 
 	/**
-	 * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+	 * Tests that new requests are ignored after a handler is shut down.
 	 */
 	@Test
-	public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
-		testHandler.closeFuture = new CompletableFuture<>();
-		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+	public void testRequestsRejectedAfterShutdownOfHandlerIsCompleted() throws Exception {
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id, "foobar"));
+
+		// let the test upload handler block the shut down of the RestServerEndpoint
+		testUploadHandler.closeFuture = new CompletableFuture<>();
 
-		// Initiate closing RestServerEndpoint, but the test handler should block
 		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
-		// attempt to submit a request
+		// wait until the TestHandler is closed
+		testHandler.closeLatch.await();
+
+		// requests to the TestHandler should now get rejected
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+
 		try {
 			request.get(timeout.getSize(), timeout.getUnit());
-			fail("Request should have failed.");
-		} catch (Exception ignored) {
-			// expected
+			fail("Expected a ConnectionClosedException");
+		} catch (ExecutionException ee) {
+			if (!ExceptionUtils.findThrowable(ee, ConnectionClosedException.class).isPresent()) {
+				throw ee;
+			}
 		} finally {
-			// allow the endpoint to shut down
-			testHandler.closeFuture.complete(null);
-			closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+			testUploadHandler.closeFuture.complete(null);
+			closeRestServerEndpointFuture.get();
 		}
 	}
 
@@ -704,6 +718,8 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
+		private final OneShotLatch closeLatch = new OneShotLatch();
+
 		private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
 
 		private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
@@ -727,6 +743,7 @@ public class RestServerEndpointITCase extends TestLogger {
 
 		@Override
 		public CompletableFuture<Void> closeHandlerAsync() {
+			closeLatch.trigger();
 			return closeFuture;
 		}
 	}
@@ -975,6 +992,8 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	private static class TestUploadHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
+		private volatile CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
+
 		private volatile byte[] lastUploadedFileContents;
 
 		private TestUploadHandler(
@@ -1001,6 +1020,11 @@ public class RestServerEndpointITCase extends TestLogger {
 		public byte[] getLastUploadedFileContents() {
 			return lastUploadedFileContents;
 		}
+
+		@Override
+		protected CompletableFuture<Void> closeHandlerAsync() {
+			return closeFuture;
+		}
 	}
 
 	static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
index c486571..b1a1a54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
@@ -78,4 +78,12 @@ public class InFlightRequestTrackerTest {
 			awaitFuture,
 			inFlightRequestTracker.awaitAsync());
 	}
+
+	@Test
+	public void testShouldNotRegisterNewRequestsAfterTermination() {
+		final CompletableFuture<Void> terminationFuture = inFlightRequestTracker.awaitAsync();
+
+		assertTrue(terminationFuture.isDone());
+		assertFalse(inFlightRequestTracker.registerRequest());
+	}
 }