You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:22 UTC
[flink] 02/03: [FLINK-18663][rest] Exit early if shutdown has
started
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7619db019181347a16fd37f0e4e04c420f75eee9
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 20:08:14 2020 +0800
[FLINK-18663][rest] Exit early if shutdown has started
---
.../runtime/rest/handler/AbstractHandler.java | 10 +++++-
.../runtime/rest/RestServerEndpointITCase.java | 41 ++++++++++++++++------
2 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index f61cac0..75fd904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,7 +117,15 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
FileUploads uploadedFiles = null;
try {
- inFlightRequestTracker.registerRequest();
+ synchronized (this) {
+ if (terminationFuture != null) {
+ log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+ ctx.channel().close();
+ return;
+ }
+ inFlightRequestTracker.registerRequest();
+ }
+
if (!(httpRequest instanceof FullHttpRequest)) {
// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
// FullHttpRequests.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c941c45..4352dfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -533,12 +533,11 @@ public class RestServerEndpointITCase extends TestLogger {
/**
* Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
- * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
- * HTTP requests should be served.
+ * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
+ * be accepted.
*/
@Test
public void testShouldWaitForHandlersWhenClosing() throws Exception {
- testHandler.closeFuture = new CompletableFuture<>();
final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
testHandler.handlerBody = id -> {
// Intentionally schedule the work on a different thread. This is to simulate
@@ -549,16 +548,12 @@ public class RestServerEndpointITCase extends TestLogger {
});
};
- // Initiate closing RestServerEndpoint but the test handler should block.
- final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
- assertThat(closeRestServerEndpointFuture.isDone(), is(false));
-
+ // create an in-flight request
final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
handlerBlocker.awaitRequestToArrive();
- // Allow handler to close but there is still one in-flight request which should prevent
- // the RestServerEndpoint from closing.
- testHandler.closeFuture.complete(null);
+ // Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
+ final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
assertThat(closeRestServerEndpointFuture.isDone(), is(false));
// Finish the in-flight request.
@@ -568,6 +563,32 @@ public class RestServerEndpointITCase extends TestLogger {
closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
}
+ /**
+ * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+ */
+ @Test
+ public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
+ testHandler.closeFuture = new CompletableFuture<>();
+ testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+
+ // Initiate closing RestServerEndpoint, but the test handler should block
+ final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+ assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+ // attempt to submit a request
+ final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+ try {
+ request.get(timeout.getSize(), timeout.getUnit());
+ fail("Request should have failed.");
+ } catch (Exception ignored) {
+ // expected
+ } finally {
+ // allow the endpoint to shut down
+ testHandler.closeFuture.complete(null);
+ closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+ }
+ }
+
@Test
public void testRestServerBindPort() throws Exception {
final int portRangeStart = 52300;