You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/03 17:01:55 UTC
[flink] 01/02: [FLINK-11163][tests] Use random port in
RestClusterClientTest
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 903a9090324f8d14af15621b7ec026f4a8485050
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 20 12:59:09 2018 +0100
[FLINK-11163][tests] Use random port in RestClusterClientTest
---
.../client/program/rest/RestClusterClientTest.java | 314 ++++++++++++---------
1 file changed, 185 insertions(+), 129 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 3e0f2f5..bbacd84 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -89,6 +89,7 @@ import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
@@ -158,8 +159,6 @@ public class RestClusterClientTest extends TestLogger {
private RestServerEndpointConfiguration restServerEndpointConfiguration;
- private RestClusterClient<StandaloneClusterId> restClusterClient;
-
private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
private ExecutorService executor;
@@ -167,28 +166,60 @@ public class RestClusterClientTest extends TestLogger {
private JobGraph jobGraph;
private JobID jobId;
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
+ private static final Configuration restConfig;
+ static {
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
config.setLong(RestOptions.RETRY_DELAY, 0);
+ config.setInteger(RestOptions.PORT, 0);
+
+ restConfig = config;
+ }
- restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(config);
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
- final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+
+ jobGraph = new JobGraph("testjob");
+ jobId = jobGraph.getJobID();
+ }
+
+ @After
+ public void tearDown() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
+ final Configuration clientConfig = new Configuration(restConfig);
+ clientConfig.setInteger(RestOptions.PORT, port);
+ return new RestClusterClient<>(
+ clientConfig,
+ createRestClient(),
+ StandaloneClusterId.getInstance(),
+ (attempt) -> 0,
+ null);
+ }
+
+ @Nonnull
+ private RestClient createRestClient() throws ConfigurationException {
+ return new RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
@Override
public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
sendRequest(
- final String targetAddress,
- final int targetPort,
- final M messageHeaders,
- final U messageParameters,
- final R request) throws IOException {
+ final String targetAddress,
+ final int targetPort,
+ final M messageHeaders,
+ final U messageParameters,
+ final R request) throws IOException {
if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
return FutureUtils.completedExceptionally(new IOException("expected"));
} else {
@@ -196,26 +227,6 @@ public class RestClusterClientTest extends TestLogger {
}
}
};
- restClusterClient = new RestClusterClient<>(
- config,
- restClient,
- StandaloneClusterId.getInstance(),
- (attempt) -> 0,
- null);
-
- jobGraph = new JobGraph("testjob");
- jobId = jobGraph.getJobID();
- }
-
- @After
- public void tearDown() throws Exception {
- if (restClusterClient != null) {
- restClusterClient.shutdown();
- }
-
- if (executor != null) {
- executor.shutdown();
- }
}
@Test
@@ -230,22 +241,27 @@ public class RestClusterClientTest extends TestLogger {
.netRuntime(Long.MAX_VALUE)
.build()));
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
submitHandler,
terminationHandler,
testJobExecutionResultHandler)) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- Assert.assertFalse(submitHandler.jobSubmitted);
- restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
- Assert.assertTrue(submitHandler.jobSubmitted);
+ try {
+ Assert.assertFalse(submitHandler.jobSubmitted);
+ restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+ Assert.assertTrue(submitHandler.jobSubmitted);
- Assert.assertFalse(terminationHandler.jobCanceled);
- restClusterClient.cancel(jobId);
- Assert.assertTrue(terminationHandler.jobCanceled);
+ Assert.assertFalse(terminationHandler.jobCanceled);
+ restClusterClient.cancel(jobId);
+ Assert.assertTrue(terminationHandler.jobCanceled);
- Assert.assertFalse(terminationHandler.jobStopped);
- restClusterClient.stop(jobId);
- Assert.assertTrue(terminationHandler.jobStopped);
+ Assert.assertFalse(terminationHandler.jobStopped);
+ restClusterClient.stop(jobId);
+ Assert.assertTrue(terminationHandler.jobStopped);
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -257,16 +273,21 @@ public class RestClusterClientTest extends TestLogger {
final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
testJobSubmitHandler)) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- restClusterClient.setDetached(true);
- final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-
- // if the detached mode didn't work, then we would not reach this point because the execution result
- // retrieval would have failed.
- assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
- assertThat(jobSubmissionResult.getJobID(), is(jobId));
+ try {
+ restClusterClient.setDetached(true);
+ final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+
+ // if the detached mode didn't work, then we would not reach this point because the execution result
+ // retrieval would have failed.
+ assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
+ assertThat(jobSubmissionResult.getJobID(), is(jobId));
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -365,29 +386,34 @@ public class RestClusterClientTest extends TestLogger {
failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
testJobExecutionResultHandler,
new TestJobSubmitHandler())) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobExecutionResult jobExecutionResult;
+ try {
+ JobExecutionResult jobExecutionResult;
- jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
- jobGraph,
- ClassLoader.getSystemClassLoader());
- assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
- assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
- assertThat(
- jobExecutionResult.getAllAccumulatorResults(),
- equalTo(Collections.singletonMap("testName", 1.0)));
+ jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
+ jobGraph,
+ ClassLoader.getSystemClassLoader());
+ assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
+ assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
+ assertThat(
+ jobExecutionResult.getAllAccumulatorResults(),
+ equalTo(Collections.singletonMap("testName", 1.0)));
- try {
- restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
- fail("Expected exception not thrown.");
- } catch (final ProgramInvocationException e) {
- final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+ try {
+ restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+ fail("Expected exception not thrown.");
+ } catch (final ProgramInvocationException e) {
+ final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
- assertThat(cause.isPresent(), is(true));
- assertThat(cause.get().getMessage(), equalTo("expected"));
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(), equalTo("expected"));
+ }
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -420,43 +446,48 @@ public class RestClusterClientTest extends TestLogger {
failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
messageHeaders instanceof SavepointStatusHeaders && !firstPollFailed.getAndSet(true);
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
triggerHandler,
savepointHandler)) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobID id = new JobID();
- {
- CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
- String savepointPath = savepointPathFuture.get();
- assertEquals(savepointLocationDefaultDir, savepointPath);
- }
+ try {
+ JobID id = new JobID();
+ {
+ CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
+ String savepointPath = savepointPathFuture.get();
+ assertEquals(savepointLocationDefaultDir, savepointPath);
+ }
- {
- CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
- String savepointPath = savepointPathFuture.get();
- assertEquals(savepointLocationRequestedDir, savepointPath);
- }
+ {
+ CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
+ String savepointPath = savepointPathFuture.get();
+ assertEquals(savepointLocationRequestedDir, savepointPath);
+ }
+
+ {
+ try {
+ restClusterClient.triggerSavepoint(id, null).get();
+ fail("Expected exception not thrown.");
+ } catch (ExecutionException e) {
+ final Throwable cause = e.getCause();
+ assertThat(cause, instanceOf(SerializedThrowable.class));
+ assertThat(((SerializedThrowable) cause)
+ .deserializeError(ClassLoader.getSystemClassLoader())
+ .getMessage(), equalTo("expected"));
+ }
+ }
- {
try {
- restClusterClient.triggerSavepoint(id, null).get();
+ restClusterClient.triggerSavepoint(new JobID(), null).get();
fail("Expected exception not thrown.");
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- assertThat(cause, instanceOf(SerializedThrowable.class));
- assertThat(((SerializedThrowable) cause)
- .deserializeError(ClassLoader.getSystemClassLoader())
- .getMessage(), equalTo("expected"));
+ } catch (final ExecutionException e) {
+ assertTrue(
+ "RestClientException not in causal chain",
+ ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
}
- }
-
- try {
- restClusterClient.triggerSavepoint(new JobID(), null).get();
- fail("Expected exception not thrown.");
- } catch (final ExecutionException e) {
- assertTrue(
- "RestClientException not in causal chain",
- ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -540,32 +571,38 @@ public class RestClusterClientTest extends TestLogger {
OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(testException))),
OptionalFailure.ofFailure(testException));
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
testSavepointDisposalStatusHandler,
testSavepointDisposalTriggerHandler)) {
- {
- final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
- assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
- }
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+ try {
+ {
+ final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+ assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
+ }
- {
- final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+ {
+ final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
- try {
- disposeSavepointFuture.get();
- fail("Expected an exception");
- } catch (ExecutionException ee) {
- assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+ try {
+ disposeSavepointFuture.get();
+ fail("Expected an exception");
+ } catch (ExecutionException ee) {
+ assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+ }
}
- }
- {
- try {
- restClusterClient.disposeSavepoint(savepointPath).get();
- fail("Expected an exception.");
- } catch (ExecutionException ee) {
- assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+ {
+ try {
+ restClusterClient.disposeSavepoint(savepointPath).get();
+ fail("Expected an exception.");
+ } catch (ExecutionException ee) {
+ assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+ }
}
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -626,14 +663,18 @@ public class RestClusterClientTest extends TestLogger {
@Test
public void testListJobs() throws Exception {
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(new TestListJobsHandler())) {
- {
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new TestListJobsHandler())) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+ try {
CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = restClusterClient.listJobs();
Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
JobStatusMessage job1 = jobDetailsIterator.next();
JobStatusMessage job2 = jobDetailsIterator.next();
Assert.assertNotEquals("The job status should not be equal.", job1.getJobState(), job2.getJobState());
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -642,17 +683,22 @@ public class RestClusterClientTest extends TestLogger {
public void testGetAccumulators() throws Exception {
TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobID id = new JobID();
+ try {
+ JobID id = new JobID();
- {
- Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
- assertNotNull(accumulators);
- assertEquals(1, accumulators.size());
+ {
+ Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
+ assertNotNull(accumulators);
+ assertEquals(1, accumulators.size());
- assertEquals(true, accumulators.containsKey("testKey"));
- assertEquals("testValue", accumulators.get("testKey").get().toString());
+ assertEquals(true, accumulators.containsKey("testKey"));
+ assertEquals("testValue", accumulators.get("testKey").get().toString());
+ }
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -698,11 +744,17 @@ public class RestClusterClientTest extends TestLogger {
CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
- final AtomicBoolean firstPollFailed = new AtomicBoolean();
- failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
- messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+ try {
+ final AtomicBoolean firstPollFailed = new AtomicBoolean();
+ failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+ messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+
+ restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -716,11 +768,15 @@ public class RestClusterClientTest extends TestLogger {
FutureUtils.completedExceptionally(new RestHandlerException(exceptionMessage, HttpResponseStatus.NOT_FOUND)));
try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
try {
restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
fail("The rest request should have failed.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent(), is(true));
+ } finally {
+ restClusterClient.shutdown();
}
}
}