You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/03 17:01:55 UTC

[flink] 01/02: [FLINK-11163][tests] Use random port in RestClusterClientTest

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 903a9090324f8d14af15621b7ec026f4a8485050
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 20 12:59:09 2018 +0100

    [FLINK-11163][tests] Use random port in RestClusterClientTest
---
 .../client/program/rest/RestClusterClientTest.java | 314 ++++++++++++---------
 1 file changed, 185 insertions(+), 129 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 3e0f2f5..bbacd84 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -89,6 +89,7 @@ import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
@@ -158,8 +159,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	private RestServerEndpointConfiguration restServerEndpointConfiguration;
 
-	private RestClusterClient<StandaloneClusterId> restClusterClient;
-
 	private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
 
 	private ExecutorService executor;
@@ -167,28 +166,60 @@ public class RestClusterClientTest extends TestLogger {
 	private JobGraph jobGraph;
 	private JobID jobId;
 
-	@Before
-	public void setUp() throws Exception {
-		MockitoAnnotations.initMocks(this);
+	private static final Configuration restConfig;
 
+	static {
 		final Configuration config = new Configuration();
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 		config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
 		config.setLong(RestOptions.RETRY_DELAY, 0);
+		config.setInteger(RestOptions.PORT, 0);
+
+		restConfig = config;
+	}
 
-		restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(config);
+	@Before
+	public void setUp() throws Exception {
+		MockitoAnnotations.initMocks(this);
+
+		restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
 		mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
 
 		executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
-		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+
+		jobGraph = new JobGraph("testjob");
+		jobId = jobGraph.getJobID();
+	}
+
+	@After
+	public void tearDown() {
+		if (executor != null) {
+			executor.shutdown();
+		}
+	}
+
+	private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
+		final Configuration clientConfig = new Configuration(restConfig);
+		clientConfig.setInteger(RestOptions.PORT, port);
+		return new RestClusterClient<>(
+			clientConfig,
+			createRestClient(),
+			StandaloneClusterId.getInstance(),
+			(attempt) -> 0,
+			null);
+	}
+
+	@Nonnull
+	private RestClient createRestClient() throws ConfigurationException {
+		return new RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
 			@Override
 			public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
 			sendRequest(
-					final String targetAddress,
-					final int targetPort,
-					final M messageHeaders,
-					final U messageParameters,
-					final R request) throws IOException {
+				final String targetAddress,
+				final int targetPort,
+				final M messageHeaders,
+				final U messageParameters,
+				final R request) throws IOException {
 				if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
 					return FutureUtils.completedExceptionally(new IOException("expected"));
 				} else {
@@ -196,26 +227,6 @@ public class RestClusterClientTest extends TestLogger {
 				}
 			}
 		};
-		restClusterClient = new RestClusterClient<>(
-			config,
-			restClient,
-			StandaloneClusterId.getInstance(),
-			(attempt) -> 0,
-			null);
-
-		jobGraph = new JobGraph("testjob");
-		jobId = jobGraph.getJobID();
-	}
-
-	@After
-	public void tearDown() throws Exception {
-		if (restClusterClient != null) {
-			restClusterClient.shutdown();
-		}
-
-		if (executor != null) {
-			executor.shutdown();
-		}
 	}
 
 	@Test
@@ -230,22 +241,27 @@ public class RestClusterClientTest extends TestLogger {
 					.netRuntime(Long.MAX_VALUE)
 					.build()));
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			submitHandler,
 			terminationHandler,
 			testJobExecutionResultHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			Assert.assertFalse(submitHandler.jobSubmitted);
-			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-			Assert.assertTrue(submitHandler.jobSubmitted);
+			try {
+				Assert.assertFalse(submitHandler.jobSubmitted);
+				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+				Assert.assertTrue(submitHandler.jobSubmitted);
 
-			Assert.assertFalse(terminationHandler.jobCanceled);
-			restClusterClient.cancel(jobId);
-			Assert.assertTrue(terminationHandler.jobCanceled);
+				Assert.assertFalse(terminationHandler.jobCanceled);
+				restClusterClient.cancel(jobId);
+				Assert.assertTrue(terminationHandler.jobCanceled);
 
-			Assert.assertFalse(terminationHandler.jobStopped);
-			restClusterClient.stop(jobId);
-			Assert.assertTrue(terminationHandler.jobStopped);
+				Assert.assertFalse(terminationHandler.jobStopped);
+				restClusterClient.stop(jobId);
+				Assert.assertTrue(terminationHandler.jobStopped);
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 	}
 
@@ -257,16 +273,21 @@ public class RestClusterClientTest extends TestLogger {
 
 		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testJobSubmitHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			restClusterClient.setDetached(true);
-			final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-
-			// if the detached mode didn't work, then we would not reach this point because the execution result
-			// retrieval would have failed.
-			assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
-			assertThat(jobSubmissionResult.getJobID(), is(jobId));
+			try {
+				restClusterClient.setDetached(true);
+				final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+
+				// if the detached mode didn't work, then we would not reach this point because the execution result
+				// retrieval would have failed.
+				assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
+				assertThat(jobSubmissionResult.getJobID(), is(jobId));
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 
 	}
@@ -365,29 +386,34 @@ public class RestClusterClientTest extends TestLogger {
 		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
 			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testJobExecutionResultHandler,
 			new TestJobSubmitHandler())) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobExecutionResult jobExecutionResult;
+			try {
+				JobExecutionResult jobExecutionResult;
 
-			jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
-				jobGraph,
-				ClassLoader.getSystemClassLoader());
-			assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-			assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
-			assertThat(
-				jobExecutionResult.getAllAccumulatorResults(),
-				equalTo(Collections.singletonMap("testName", 1.0)));
+				jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
+					jobGraph,
+					ClassLoader.getSystemClassLoader());
+				assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
+				assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
+				assertThat(
+					jobExecutionResult.getAllAccumulatorResults(),
+					equalTo(Collections.singletonMap("testName", 1.0)));
 
-			try {
-				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-				fail("Expected exception not thrown.");
-			} catch (final ProgramInvocationException e) {
-				final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+				try {
+					restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+					fail("Expected exception not thrown.");
+				} catch (final ProgramInvocationException e) {
+					final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
 
-				assertThat(cause.isPresent(), is(true));
-				assertThat(cause.get().getMessage(), equalTo("expected"));
+					assertThat(cause.isPresent(), is(true));
+					assertThat(cause.get().getMessage(), equalTo("expected"));
+				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -420,43 +446,48 @@ public class RestClusterClientTest extends TestLogger {
 		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
 			messageHeaders instanceof SavepointStatusHeaders && !firstPollFailed.getAndSet(true);
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			triggerHandler,
 			savepointHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobID id = new JobID();
-			{
-				CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
-				String savepointPath = savepointPathFuture.get();
-				assertEquals(savepointLocationDefaultDir, savepointPath);
-			}
+			try {
+				JobID id = new JobID();
+				{
+					CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
+					String savepointPath = savepointPathFuture.get();
+					assertEquals(savepointLocationDefaultDir, savepointPath);
+				}
 
-			{
-				CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
-				String savepointPath = savepointPathFuture.get();
-				assertEquals(savepointLocationRequestedDir, savepointPath);
-			}
+				{
+					CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
+					String savepointPath = savepointPathFuture.get();
+					assertEquals(savepointLocationRequestedDir, savepointPath);
+				}
+
+				{
+					try {
+						restClusterClient.triggerSavepoint(id, null).get();
+						fail("Expected exception not thrown.");
+					} catch (ExecutionException e) {
+						final Throwable cause = e.getCause();
+						assertThat(cause, instanceOf(SerializedThrowable.class));
+						assertThat(((SerializedThrowable) cause)
+							.deserializeError(ClassLoader.getSystemClassLoader())
+							.getMessage(), equalTo("expected"));
+					}
+				}
 
-			{
 				try {
-					restClusterClient.triggerSavepoint(id, null).get();
+					restClusterClient.triggerSavepoint(new JobID(), null).get();
 					fail("Expected exception not thrown.");
-				} catch (ExecutionException e) {
-					final Throwable cause = e.getCause();
-					assertThat(cause, instanceOf(SerializedThrowable.class));
-					assertThat(((SerializedThrowable) cause)
-						.deserializeError(ClassLoader.getSystemClassLoader())
-						.getMessage(), equalTo("expected"));
+				} catch (final ExecutionException e) {
+					assertTrue(
+						"RestClientException not in causal chain",
+						ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
 				}
-			}
-
-			try {
-				restClusterClient.triggerSavepoint(new JobID(), null).get();
-				fail("Expected exception not thrown.");
-			} catch (final ExecutionException e) {
-				assertTrue(
-					"RestClientException not in causal chain",
-					ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -540,32 +571,38 @@ public class RestClusterClientTest extends TestLogger {
 			OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(testException))),
 			OptionalFailure.ofFailure(testException));
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testSavepointDisposalStatusHandler,
 			testSavepointDisposalTriggerHandler)) {
-			{
-				final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
-				assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
-			}
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+			try {
+				{
+					final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+					assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
+				}
 
-			{
-				final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+				{
+					final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
 
-				try {
-					disposeSavepointFuture.get();
-					fail("Expected an exception");
-				} catch (ExecutionException ee) {
-					assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+					try {
+						disposeSavepointFuture.get();
+						fail("Expected an exception");
+					} catch (ExecutionException ee) {
+						assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+					}
 				}
-			}
 
-			{
-				try {
-					restClusterClient.disposeSavepoint(savepointPath).get();
-					fail("Expected an exception.");
-				} catch (ExecutionException ee) {
-					assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+				{
+					try {
+						restClusterClient.disposeSavepoint(savepointPath).get();
+						fail("Expected an exception.");
+					} catch (ExecutionException ee) {
+						assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+					}
 				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -626,14 +663,18 @@ public class RestClusterClientTest extends TestLogger {
 
 	@Test
 	public void testListJobs() throws Exception {
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(new TestListJobsHandler())) {
-			{
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new TestListJobsHandler())) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+			try {
 				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = restClusterClient.listJobs();
 				Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
 				Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
 				JobStatusMessage job1 = jobDetailsIterator.next();
 				JobStatusMessage job2 = jobDetailsIterator.next();
 				Assert.assertNotEquals("The job status should not be equal.", job1.getJobState(), job2.getJobState());
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -642,17 +683,22 @@ public class RestClusterClientTest extends TestLogger {
 	public void testGetAccumulators() throws Exception {
 		TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobID id = new JobID();
+			try {
+				JobID id = new JobID();
 
-			{
-				Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
-				assertNotNull(accumulators);
-				assertEquals(1, accumulators.size());
+				{
+					Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
+					assertNotNull(accumulators);
+					assertEquals(1, accumulators.size());
 
-				assertEquals(true, accumulators.containsKey("testKey"));
-				assertEquals("testValue", accumulators.get("testKey").get().toString());
+					assertEquals(true, accumulators.containsKey("testKey"));
+					assertEquals("testValue", accumulators.get("testKey").get().toString());
+				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -698,11 +744,17 @@ public class RestClusterClientTest extends TestLogger {
 			CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
 
 		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
-			final AtomicBoolean firstPollFailed = new AtomicBoolean();
-			failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
-				messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+			try {
+				final AtomicBoolean firstPollFailed = new AtomicBoolean();
+				failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+					messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+
+				restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 	}
 
@@ -716,11 +768,15 @@ public class RestClusterClientTest extends TestLogger {
 			FutureUtils.completedExceptionally(new RestHandlerException(exceptionMessage, HttpResponseStatus.NOT_FOUND)));
 
 		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
 			try {
 				restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
 				fail("The rest request should have failed.");
 			}  catch (Exception e) {
 				assertThat(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent(), is(true));
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}