You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:20 UTC

[flink] branch master updated (7d39a2c -> 91a4c03)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7d39a2c  [FLINK-16619][coordination] Log reception of slot reports only once
     new ee4b27f  [FLINK-18663][rest] Improve exception handling
     new 7619db0  [FLINK-18663][rest] Exit early if shutdown has started
     new 91a4c03  [hotfix][rest][tests] Replace HandlerBlocker with BlockerSync

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rest/handler/AbstractHandler.java      |  27 ++++-
 .../runtime/rest/RestServerEndpointITCase.java     | 120 +++++++++------------
 2 files changed, 70 insertions(+), 77 deletions(-)


[flink] 02/03: [FLINK-18663][rest] Exit early if shutdown has started

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7619db019181347a16fd37f0e4e04c420f75eee9
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 20:08:14 2020 +0800

    [FLINK-18663][rest] Exit early if shutdown has started
---
 .../runtime/rest/handler/AbstractHandler.java      | 10 +++++-
 .../runtime/rest/RestServerEndpointITCase.java     | 41 ++++++++++++++++------
 2 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index f61cac0..75fd904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -117,7 +117,15 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		FileUploads uploadedFiles = null;
 		try {
-			inFlightRequestTracker.registerRequest();
+			synchronized (this) {
+				if (terminationFuture != null) {
+					log.debug("The handler instance for {} had already been closed.", untypedResponseMessageHeaders.getTargetRestEndpointURL());
+					ctx.channel().close();
+					return;
+				}
+				inFlightRequestTracker.registerRequest();
+			}
+
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c941c45..4352dfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -533,12 +533,11 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	/**
 	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
-	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
-	 * HTTP requests should be served.
+	 * first, and we wait for in-flight requests to finish. Once the shutdown is initiated, no further requests should
+	 * be accepted.
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		testHandler.closeFuture = new CompletableFuture<>();
 		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
@@ -549,16 +548,12 @@ public class RestServerEndpointITCase extends TestLogger {
 			});
 		};
 
-		// Initiate closing RestServerEndpoint but the test handler should block.
-		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
-		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
-
+		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
 		handlerBlocker.awaitRequestToArrive();
 
-		// Allow handler to close but there is still one in-flight request which should prevent
-		// the RestServerEndpoint from closing.
-		testHandler.closeFuture.complete(null);
+		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
@@ -568,6 +563,32 @@ public class RestServerEndpointITCase extends TestLogger {
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
 	}
 
+	/**
+	 * Tests that new requests are ignored after calling {@link RestServerEndpoint#closeAsync()}.
+	 */
+	@Test
+	public void testRequestsRejectedAfterShutdownInitiation() throws Exception {
+		testHandler.closeFuture = new CompletableFuture<>();
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse(id));
+
+		// Initiate closing RestServerEndpoint, but the test handler should block
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// attempt to submit a request
+		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+		try {
+			request.get(timeout.getSize(), timeout.getUnit());
+			fail("Request should have failed.");
+		} catch (Exception ignored) {
+			// expected
+		} finally {
+			// allow the endpoint to shut down
+			testHandler.closeFuture.complete(null);
+			closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+		}
+	}
+
 	@Test
 	public void testRestServerBindPort() throws Exception {
 		final int portRangeStart = 52300;


[flink] 01/03: [FLINK-18663][rest] Improve exception handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee4b27f96e8156c6a1c4b825484d238055cecf5f
Author: Tartarus0zm <zh...@163.com>
AuthorDate: Wed Jul 29 18:56:28 2020 +0800

    [FLINK-18663][rest] Improve exception handling
    
    - ensure that request finalization runs even if handleException throws an exception
    - catch NPE in handleException, which occurs if the client closes the connection
---
 .../flink/runtime/rest/handler/AbstractHandler.java     | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 40094bb..f61cac0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -57,6 +57,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Super class for netty-based handlers that work with {@link RequestBody}.
@@ -177,13 +178,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
+				.handle((Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
+					}
+					return CompletableFuture.<Void>completedFuture(null);
+				}).thenCompose(Function.identity())
 				.whenComplete((Void ignored, Throwable throwable) -> {
 					if (throwable != null) {
-						handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
-							.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
-					} else {
-						finalizeRequestProcessing(finalUploadedFiles);
+						log.warn("An exception occurred while handling another exception.", throwable);
 					}
+					finalizeRequestProcessing(finalUploadedFiles);
 				});
 		} catch (Throwable e) {
 			final FileUploads finalUploadedFiles = uploadedFiles;
@@ -199,6 +204,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 	private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
 		FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
+		if (flinkHttpObjectAggregator == null) {
+			log.warn("The connection was unexpectedly closed by the client.");
+			return CompletableFuture.completedFuture(null);
+		}
 		int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
 		if (throwable instanceof RestHandlerException) {
 			RestHandlerException rhe = (RestHandlerException) throwable;


[flink] 03/03: [hotfix][rest][tests] Replace HandlerBlocker with BlockerSync

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91a4c03d3445ddb52e4265faade5167edb6372d2
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 3 14:52:57 2020 +0200

    [hotfix][rest][tests] Replace HandlerBlocker with BlockerSync
---
 .../runtime/rest/RestServerEndpointITCase.java     | 79 +++++-----------------
 1 file changed, 17 insertions(+), 62 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 4352dfa..59190ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -94,7 +95,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -102,7 +102,6 @@ import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -269,24 +268,28 @@ public class RestServerEndpointITCase extends TestLogger {
 	 */
 	@Test
 	public void testRequestInterleaving() throws Exception {
-		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		final BlockerSync sync = new BlockerSync();
 		testHandler.handlerBody = id -> {
 			if (id == 1) {
-				handlerBlocker.arriveAndBlock();
+				try {
+					sync.block();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 			}
 			return CompletableFuture.completedFuture(new TestResponse(id));
 		};
 
 		// send first request and wait until the handler blocks
 		final CompletableFuture<TestResponse> response1 = sendRequestToTestHandler(new TestRequest(1));
-		handlerBlocker.awaitRequestToArrive();
+		sync.awaitBlocker();
 
 		// send second request and verify response
 		final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2));
 		assertEquals(2, response2.get().id);
 
 		// wake up blocked handler
-		handlerBlocker.unblockRequest();
+		sync.releaseBlocker();
 
 		// verify response to first request
 		assertEquals(1, response1.get().id);
@@ -538,26 +541,30 @@ public class RestServerEndpointITCase extends TestLogger {
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		final BlockerSync sync = new BlockerSync();
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
 			// handlers where the CompletableFuture is finished by the RPC framework.
 			return CompletableFuture.supplyAsync(() -> {
-				handlerBlocker.arriveAndBlock();
+				try {
+					sync.block();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 				return new TestResponse(id);
 			});
 		};
 
 		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
-		handlerBlocker.awaitRequestToArrive();
+		sync.awaitBlocker();
 
 		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
 		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
-		handlerBlocker.unblockRequest();
+		sync.releaseBlocker();
 
 		request.get(timeout.getSize(), timeout.getUnit());
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
@@ -751,58 +758,6 @@ public class RestServerEndpointITCase extends TestLogger {
 		return parameters;
 	}
 
-	/**
-	 * This is a helper class for tests that require to have fine-grained control over HTTP
-	 * requests so that they are not dispatched immediately.
-	 */
-	private static class HandlerBlocker {
-
-		private final Time timeout;
-
-		private final CountDownLatch requestArrivedLatch = new CountDownLatch(1);
-
-		private final CountDownLatch finishRequestLatch = new CountDownLatch(1);
-
-		private HandlerBlocker(final Time timeout) {
-			this.timeout = checkNotNull(timeout);
-		}
-
-		/**
-		 * Waits until {@link #arriveAndBlock()} is called.
-		 */
-		public void awaitRequestToArrive() {
-			try {
-				assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
-			} catch (final InterruptedException e) {
-				Thread.currentThread().interrupt();
-			}
-		}
-
-		/**
-		 * Signals that the request arrived. This method blocks until {@link #unblockRequest()} is
-		 * called.
-		 */
-		public void arriveAndBlock() {
-			markRequestArrived();
-			try {
-				assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
-			} catch (final InterruptedException e) {
-				Thread.currentThread().interrupt();
-			}
-		}
-
-		/**
-		 * @see #arriveAndBlock()
-		 */
-		public void unblockRequest() {
-			finishRequestLatch.countDown();
-		}
-
-		private void markRequestArrived() {
-			requestArrivedLatch.countDown();
-		}
-	}
-
 	static class TestRestClient extends RestClient {
 
 		TestRestClient(RestClientConfiguration configuration) {