You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/08/03 20:18:23 UTC
[flink] 03/03: [hotfix][rest][tests] Replace HandlerBlocker with
BlockerSync
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 91a4c03d3445ddb52e4265faade5167edb6372d2
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 3 14:52:57 2020 +0200
[hotfix][rest][tests] Replace HandlerBlocker with BlockerSync
---
.../runtime/rest/RestServerEndpointITCase.java | 79 +++++-----------------
1 file changed, 17 insertions(+), 62 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 4352dfa..59190ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -94,7 +95,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -102,7 +102,6 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -269,24 +268,28 @@ public class RestServerEndpointITCase extends TestLogger {
*/
@Test
public void testRequestInterleaving() throws Exception {
- final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+ final BlockerSync sync = new BlockerSync();
testHandler.handlerBody = id -> {
if (id == 1) {
- handlerBlocker.arriveAndBlock();
+ try {
+ sync.block();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
return CompletableFuture.completedFuture(new TestResponse(id));
};
// send first request and wait until the handler blocks
final CompletableFuture<TestResponse> response1 = sendRequestToTestHandler(new TestRequest(1));
- handlerBlocker.awaitRequestToArrive();
+ sync.awaitBlocker();
// send second request and verify response
final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2));
assertEquals(2, response2.get().id);
// wake up blocked handler
- handlerBlocker.unblockRequest();
+ sync.releaseBlocker();
// verify response to first request
assertEquals(1, response1.get().id);
@@ -538,26 +541,30 @@ public class RestServerEndpointITCase extends TestLogger {
*/
@Test
public void testShouldWaitForHandlersWhenClosing() throws Exception {
- final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+ final BlockerSync sync = new BlockerSync();
testHandler.handlerBody = id -> {
// Intentionally schedule the work on a different thread. This is to simulate
// handlers where the CompletableFuture is finished by the RPC framework.
return CompletableFuture.supplyAsync(() -> {
- handlerBlocker.arriveAndBlock();
+ try {
+ sync.block();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
return new TestResponse(id);
});
};
// create an in-flight request
final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
- handlerBlocker.awaitRequestToArrive();
+ sync.awaitBlocker();
// Initiate closing RestServerEndpoint but the test handler should block due to the in-flight request
final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
assertThat(closeRestServerEndpointFuture.isDone(), is(false));
// Finish the in-flight request.
- handlerBlocker.unblockRequest();
+ sync.releaseBlocker();
request.get(timeout.getSize(), timeout.getUnit());
closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
@@ -751,58 +758,6 @@ public class RestServerEndpointITCase extends TestLogger {
return parameters;
}
- /**
- * This is a helper class for tests that require to have fine-grained control over HTTP
- * requests so that they are not dispatched immediately.
- */
- private static class HandlerBlocker {
-
- private final Time timeout;
-
- private final CountDownLatch requestArrivedLatch = new CountDownLatch(1);
-
- private final CountDownLatch finishRequestLatch = new CountDownLatch(1);
-
- private HandlerBlocker(final Time timeout) {
- this.timeout = checkNotNull(timeout);
- }
-
- /**
- * Waits until {@link #arriveAndBlock()} is called.
- */
- public void awaitRequestToArrive() {
- try {
- assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Signals that the request arrived. This method blocks until {@link #unblockRequest()} is
- * called.
- */
- public void arriveAndBlock() {
- markRequestArrived();
- try {
- assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * @see #arriveAndBlock()
- */
- public void unblockRequest() {
- finishRequestLatch.countDown();
- }
-
- private void markRequestArrived() {
- requestArrivedLatch.countDown();
- }
- }
-
static class TestRestClient extends RestClient {
TestRestClient(RestClientConfiguration configuration) {