You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:23 UTC

[flink] 03/03: [hotfix][rest][tests] Replace HandlerBlocker with BlockerSync

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91a4c03d3445ddb52e4265faade5167edb6372d2
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 3 14:52:57 2020 +0200

    [hotfix][rest][tests] Replace HandlerBlocker with BlockerSync
---
 .../runtime/rest/RestServerEndpointITCase.java     | 79 +++++-----------------
 1 file changed, 17 insertions(+), 62 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 4352dfa..59190ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -94,7 +95,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -102,7 +102,6 @@ import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -269,24 +268,28 @@ public class RestServerEndpointITCase extends TestLogger {
 	 */
 	@Test
 	public void testRequestInterleaving() throws Exception {
-		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		final BlockerSync sync = new BlockerSync();
 		testHandler.handlerBody = id -> {
 			if (id == 1) {
-				handlerBlocker.arriveAndBlock();
+				try {
+					sync.block();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 			}
 			return CompletableFuture.completedFuture(new TestResponse(id));
 		};
 
 		// send first request and wait until the handler blocks
 		final CompletableFuture<TestResponse> response1 = sendRequestToTestHandler(new TestRequest(1));
-		handlerBlocker.awaitRequestToArrive();
+		sync.awaitBlocker();
 
 		// send second request and verify response
 		final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2));
 		assertEquals(2, response2.get().id);
 
 		// wake up blocked handler
-		handlerBlocker.unblockRequest();
+		sync.releaseBlocker();
 
 		// verify response to first request
 		assertEquals(1, response1.get().id);
@@ -538,26 +541,30 @@ public class RestServerEndpointITCase extends TestLogger {
 	 */
 	@Test
 	public void testShouldWaitForHandlersWhenClosing() throws Exception {
-		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		final BlockerSync sync = new BlockerSync();
 		testHandler.handlerBody = id -> {
 			// Intentionally schedule the work on a different thread. This is to simulate
 			// handlers where the CompletableFuture is finished by the RPC framework.
 			return CompletableFuture.supplyAsync(() -> {
-				handlerBlocker.arriveAndBlock();
+				try {
+					sync.block();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 				return new TestResponse(id);
 			});
 		};
 
 		// create an in-flight request
 		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
-		handlerBlocker.awaitRequestToArrive();
+		sync.awaitBlocker();
 
 		// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
 		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
 		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
 		// Finish the in-flight request.
-		handlerBlocker.unblockRequest();
+		sync.releaseBlocker();
 
 		request.get(timeout.getSize(), timeout.getUnit());
 		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
@@ -751,58 +758,6 @@ public class RestServerEndpointITCase extends TestLogger {
 		return parameters;
 	}
 
-	/**
-	 * This is a helper class for tests that require to have fine-grained control over HTTP
-	 * requests so that they are not dispatched immediately.
-	 */
-	private static class HandlerBlocker {
-
-		private final Time timeout;
-
-		private final CountDownLatch requestArrivedLatch = new CountDownLatch(1);
-
-		private final CountDownLatch finishRequestLatch = new CountDownLatch(1);
-
-		private HandlerBlocker(final Time timeout) {
-			this.timeout = checkNotNull(timeout);
-		}
-
-		/**
-		 * Waits until {@link #arriveAndBlock()} is called.
-		 */
-		public void awaitRequestToArrive() {
-			try {
-				assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
-			} catch (final InterruptedException e) {
-				Thread.currentThread().interrupt();
-			}
-		}
-
-		/**
-		 * Signals that the request arrived. This method blocks until {@link #unblockRequest()} is
-		 * called.
-		 */
-		public void arriveAndBlock() {
-			markRequestArrived();
-			try {
-				assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
-			} catch (final InterruptedException e) {
-				Thread.currentThread().interrupt();
-			}
-		}
-
-		/**
-		 * @see #arriveAndBlock()
-		 */
-		public void unblockRequest() {
-			finishRequestLatch.countDown();
-		}
-
-		private void markRequestArrived() {
-			requestArrivedLatch.countDown();
-		}
-	}
-
 	static class TestRestClient extends RestClient {
 
 		TestRestClient(RestClientConfiguration configuration) {