You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/03 17:01:47 UTC

[GitHub] zentol closed pull request #7345: [FLINK-11163][tests] Use random port in RestClusterClientTest

zentol closed pull request #7345: [FLINK-11163][tests] Use random port in RestClusterClientTest 
URL: https://github.com/apache/flink/pull/7345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 3e0f2f525a0..bc0432ea4ee 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -32,7 +32,6 @@
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -88,7 +87,9 @@
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
@@ -105,8 +106,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import javax.annotation.Nonnull;
 
@@ -150,16 +149,12 @@
  */
 public class RestClusterClientTest extends TestLogger {
 
-	@Mock
-	private Dispatcher mockRestfulGateway;
+	private final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
 
-	@Mock
 	private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
 
 	private RestServerEndpointConfiguration restServerEndpointConfiguration;
 
-	private RestClusterClient<StandaloneClusterId> restClusterClient;
-
 	private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
 
 	private ExecutorService executor;
@@ -167,28 +162,58 @@
 	private JobGraph jobGraph;
 	private JobID jobId;
 
-	@Before
-	public void setUp() throws Exception {
-		MockitoAnnotations.initMocks(this);
+	private static final Configuration restConfig;
 
+	static {
 		final Configuration config = new Configuration();
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 		config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
 		config.setLong(RestOptions.RETRY_DELAY, 0);
+		config.setInteger(RestOptions.PORT, 0);
+
+		restConfig = config;
+	}
 
-		restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(config);
+	@Before
+	public void setUp() throws Exception {
+		restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
 		mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
 
 		executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
-		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+
+		jobGraph = new JobGraph("testjob");
+		jobId = jobGraph.getJobID();
+	}
+
+	@After
+	public void tearDown() {
+		if (executor != null) {
+			executor.shutdown();
+		}
+	}
+
+	private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
+		final Configuration clientConfig = new Configuration(restConfig);
+		clientConfig.setInteger(RestOptions.PORT, port);
+		return new RestClusterClient<>(
+			clientConfig,
+			createRestClient(),
+			StandaloneClusterId.getInstance(),
+			(attempt) -> 0,
+			null);
+	}
+
+	@Nonnull
+	private RestClient createRestClient() throws ConfigurationException {
+		return new RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
 			@Override
 			public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
 			sendRequest(
-					final String targetAddress,
-					final int targetPort,
-					final M messageHeaders,
-					final U messageParameters,
-					final R request) throws IOException {
+				final String targetAddress,
+				final int targetPort,
+				final M messageHeaders,
+				final U messageParameters,
+				final R request) throws IOException {
 				if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
 					return FutureUtils.completedExceptionally(new IOException("expected"));
 				} else {
@@ -196,26 +221,6 @@ public void setUp() throws Exception {
 				}
 			}
 		};
-		restClusterClient = new RestClusterClient<>(
-			config,
-			restClient,
-			StandaloneClusterId.getInstance(),
-			(attempt) -> 0,
-			null);
-
-		jobGraph = new JobGraph("testjob");
-		jobId = jobGraph.getJobID();
-	}
-
-	@After
-	public void tearDown() throws Exception {
-		if (restClusterClient != null) {
-			restClusterClient.shutdown();
-		}
-
-		if (executor != null) {
-			executor.shutdown();
-		}
 	}
 
 	@Test
@@ -230,22 +235,27 @@ public void testJobSubmitCancelStop() throws Exception {
 					.netRuntime(Long.MAX_VALUE)
 					.build()));
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			submitHandler,
 			terminationHandler,
 			testJobExecutionResultHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			Assert.assertFalse(submitHandler.jobSubmitted);
-			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-			Assert.assertTrue(submitHandler.jobSubmitted);
+			try {
+				Assert.assertFalse(submitHandler.jobSubmitted);
+				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+				Assert.assertTrue(submitHandler.jobSubmitted);
 
-			Assert.assertFalse(terminationHandler.jobCanceled);
-			restClusterClient.cancel(jobId);
-			Assert.assertTrue(terminationHandler.jobCanceled);
+				Assert.assertFalse(terminationHandler.jobCanceled);
+				restClusterClient.cancel(jobId);
+				Assert.assertTrue(terminationHandler.jobCanceled);
 
-			Assert.assertFalse(terminationHandler.jobStopped);
-			restClusterClient.stop(jobId);
-			Assert.assertTrue(terminationHandler.jobStopped);
+				Assert.assertFalse(terminationHandler.jobStopped);
+				restClusterClient.stop(jobId);
+				Assert.assertTrue(terminationHandler.jobStopped);
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 	}
 
@@ -257,16 +267,21 @@ public void testDetachedJobSubmission() throws Exception {
 
 		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testJobSubmitHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			restClusterClient.setDetached(true);
-			final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-
-			// if the detached mode didn't work, then we would not reach this point because the execution result
-			// retrieval would have failed.
-			assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
-			assertThat(jobSubmissionResult.getJobID(), is(jobId));
+			try {
+				restClusterClient.setDetached(true);
+				final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+
+				// if the detached mode didn't work, then we would not reach this point because the execution result
+				// retrieval would have failed.
+				assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
+				assertThat(jobSubmissionResult.getJobID(), is(jobId));
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 
 	}
@@ -365,29 +380,34 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
 		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
 			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testJobExecutionResultHandler,
 			new TestJobSubmitHandler())) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobExecutionResult jobExecutionResult;
+			try {
+				JobExecutionResult jobExecutionResult;
 
-			jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
-				jobGraph,
-				ClassLoader.getSystemClassLoader());
-			assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-			assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
-			assertThat(
-				jobExecutionResult.getAllAccumulatorResults(),
-				equalTo(Collections.singletonMap("testName", 1.0)));
+				jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
+					jobGraph,
+					ClassLoader.getSystemClassLoader());
+				assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
+				assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
+				assertThat(
+					jobExecutionResult.getAllAccumulatorResults(),
+					equalTo(Collections.singletonMap("testName", 1.0)));
 
-			try {
-				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-				fail("Expected exception not thrown.");
-			} catch (final ProgramInvocationException e) {
-				final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+				try {
+					restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+					fail("Expected exception not thrown.");
+				} catch (final ProgramInvocationException e) {
+					final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
 
-				assertThat(cause.isPresent(), is(true));
-				assertThat(cause.get().getMessage(), equalTo("expected"));
+					assertThat(cause.isPresent(), is(true));
+					assertThat(cause.get().getMessage(), equalTo("expected"));
+				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -420,43 +440,48 @@ public void testTriggerSavepoint() throws Exception {
 		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
 			messageHeaders instanceof SavepointStatusHeaders && !firstPollFailed.getAndSet(true);
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			triggerHandler,
 			savepointHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobID id = new JobID();
-			{
-				CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
-				String savepointPath = savepointPathFuture.get();
-				assertEquals(savepointLocationDefaultDir, savepointPath);
-			}
+			try {
+				JobID id = new JobID();
+				{
+					CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
+					String savepointPath = savepointPathFuture.get();
+					assertEquals(savepointLocationDefaultDir, savepointPath);
+				}
 
-			{
-				CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
-				String savepointPath = savepointPathFuture.get();
-				assertEquals(savepointLocationRequestedDir, savepointPath);
-			}
+				{
+					CompletableFuture<String> savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
+					String savepointPath = savepointPathFuture.get();
+					assertEquals(savepointLocationRequestedDir, savepointPath);
+				}
+
+				{
+					try {
+						restClusterClient.triggerSavepoint(id, null).get();
+						fail("Expected exception not thrown.");
+					} catch (ExecutionException e) {
+						final Throwable cause = e.getCause();
+						assertThat(cause, instanceOf(SerializedThrowable.class));
+						assertThat(((SerializedThrowable) cause)
+							.deserializeError(ClassLoader.getSystemClassLoader())
+							.getMessage(), equalTo("expected"));
+					}
+				}
 
-			{
 				try {
-					restClusterClient.triggerSavepoint(id, null).get();
+					restClusterClient.triggerSavepoint(new JobID(), null).get();
 					fail("Expected exception not thrown.");
-				} catch (ExecutionException e) {
-					final Throwable cause = e.getCause();
-					assertThat(cause, instanceOf(SerializedThrowable.class));
-					assertThat(((SerializedThrowable) cause)
-						.deserializeError(ClassLoader.getSystemClassLoader())
-						.getMessage(), equalTo("expected"));
+				} catch (final ExecutionException e) {
+					assertTrue(
+						"RestClientException not in causal chain",
+						ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
 				}
-			}
-
-			try {
-				restClusterClient.triggerSavepoint(new JobID(), null).get();
-				fail("Expected exception not thrown.");
-			} catch (final ExecutionException e) {
-				assertTrue(
-					"RestClientException not in causal chain",
-					ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -540,32 +565,38 @@ public void testDisposeSavepoint() throws Exception {
 			OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(testException))),
 			OptionalFailure.ofFailure(testException));
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
 			testSavepointDisposalStatusHandler,
 			testSavepointDisposalTriggerHandler)) {
-			{
-				final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
-				assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
-			}
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			{
-				final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+			try {
+				{
+					final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+					assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
+				}
 
-				try {
-					disposeSavepointFuture.get();
-					fail("Expected an exception");
-				} catch (ExecutionException ee) {
-					assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+				{
+					final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+
+					try {
+						disposeSavepointFuture.get();
+						fail("Expected an exception");
+					} catch (ExecutionException ee) {
+						assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true));
+					}
 				}
-			}
 
-			{
-				try {
-					restClusterClient.disposeSavepoint(savepointPath).get();
-					fail("Expected an exception.");
-				} catch (ExecutionException ee) {
-					assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+				{
+					try {
+						restClusterClient.disposeSavepoint(savepointPath).get();
+						fail("Expected an exception.");
+					} catch (ExecutionException ee) {
+						assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true));
+					}
 				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -626,14 +657,18 @@ private TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperation
 
 	@Test
 	public void testListJobs() throws Exception {
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(new TestListJobsHandler())) {
-			{
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new TestListJobsHandler())) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+			try {
 				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = restClusterClient.listJobs();
 				Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
 				Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
 				JobStatusMessage job1 = jobDetailsIterator.next();
 				JobStatusMessage job2 = jobDetailsIterator.next();
 				Assert.assertNotEquals("The job status should not be equal.", job1.getJobState(), job2.getJobState());
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -642,17 +677,22 @@ public void testListJobs() throws Exception {
 	public void testGetAccumulators() throws Exception {
 		TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
 
-		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			JobID id = new JobID();
+			try {
+				JobID id = new JobID();
 
-			{
-				Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
-				assertNotNull(accumulators);
-				assertEquals(1, accumulators.size());
+				{
+					Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id);
+					assertNotNull(accumulators);
+					assertEquals(1, accumulators.size());
 
-				assertEquals(true, accumulators.containsKey("testKey"));
-				assertEquals("testValue", accumulators.get("testKey").get().toString());
+					assertEquals(true, accumulators.containsKey("testKey"));
+					assertEquals("testValue", accumulators.get("testKey").get().toString());
+				}
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}
@@ -698,11 +738,17 @@ public void testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th
 			CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
 
 		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
-			final AtomicBoolean firstPollFailed = new AtomicBoolean();
-			failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
-				messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+			try {
+				final AtomicBoolean firstPollFailed = new AtomicBoolean();
+				failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+					messageHeaders instanceof PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+
+				restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+			} finally {
+				restClusterClient.shutdown();
+			}
 		}
 	}
 
@@ -716,11 +762,15 @@ public void testSendIsNotRetriableIfHttpNotFound() throws Exception {
 			FutureUtils.completedExceptionally(new RestHandlerException(exceptionMessage, HttpResponseStatus.NOT_FOUND)));
 
 		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(pingRestHandler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
 			try {
 				restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
 				fail("The rest request should have failed.");
 			}  catch (Exception e) {
 				assertThat(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent(), is(true));
+			} finally {
+				restClusterClient.shutdown();
 			}
 		}
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services