You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/25 21:41:41 UTC

[flink] 01/02: [FLINK-11390][tests] Port testTaskManagerFailure to new codebase.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c95e9f642288bb2816cf84868709ea2543a90ae5
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Jan 18 22:32:22 2019 +0100

    [FLINK-11390][tests] Port testTaskManagerFailure to new codebase.
    
    Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to flip6 codebase:
    * Remove assertions that rely on log messages only
    * Move part where TMs are killed to YARNHighAvailabilityITCase
    * Rename test to a proper name that describes what it does
    * Add Javadoc explaning what this test does
    
    [FLINK-11390][tests] Move comment to right position
    
    [FLINK-11390][tests] Reuse YarnClient from super class
    
    Move waitUntilCondition to YarnTestBase
    
    [FLINK-11390][tests] Extract method parse hostname
    
    Extract method getOnlyApplicationReport
    
    Extract method submitJob
    
    Extract method getNumberOfSlotsPerTaskManager
    
    Extract method getFlinkConfigFromRestApi
    
    Delete useless comment
    
    Rename: runner -> yarnSessionClusterRunner
    
    Delete useless sleep & refactor
    
    Reorder methods and add static keyword where possible
    
    This closes #7546.
---
 .../client/program/rest/RestClusterClient.java     |  21 +-
 .../org/apache/flink/runtime/rest/RestClient.java  |   9 +
 .../job/metrics/JobMetricsMessageParameters.java   |   2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java     | 232 +++++++++++---
 .../yarn/YARNSessionCapacitySchedulerITCase.java   | 345 +++++++++++----------
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  20 +-
 .../org/apache/flink/yarn/testjob/YarnTestJob.java |  56 +++-
 7 files changed, 467 insertions(+), 218 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index c6dc37e..eea83c6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -272,17 +272,25 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 		}
 	}
 
-	@Override
-	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
-		JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+	/**
+	 * Requests the job details.
+	 *
+	 * @param jobId The job id
+	 * @return Job details
+	 */
+	public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
+		final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
 		final JobMessageParameters  params = new JobMessageParameters();
 		params.jobPathParameter.resolve(jobId);
 
-		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(
+		return sendRequest(
 			detailsHeaders,
 			params);
+	}
 
-		return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
+	@Override
+	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+		return getJobDetails(jobId).thenApply(JobDetailsInfo::getJobStatus);
 	}
 
 	/**
@@ -694,7 +702,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 		return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
 	}
 
-	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+	@VisibleForTesting
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
 			sendRequest(M messageHeaders, U messageParameters, R request) {
 		return sendRetriableRequest(
 			messageHeaders, messageParameters, request, isConnectionProblemOrServiceUnavailable());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a25b13c..c478cf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -185,6 +187,13 @@ public class RestClient implements AutoCloseableAsync {
 		return terminationFuture;
 	}
 
+	public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders) throws IOException {
+		return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+	}
+
 	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
 			String targetAddress,
 			int targetPort,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
index f8bab83..3f6f8af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
@@ -31,7 +31,7 @@ import java.util.Collections;
  */
 public class JobMetricsMessageParameters extends JobMessageParameters {
 
-	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+	public final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
 
 	@Override
 	public Collection<MessageQueryParameter<?>> getQueryParameters() {
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 1a2eb92..de7e02a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,28 +25,43 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 import org.apache.flink.yarn.testjob.YarnTestJob;
 import org.apache.flink.yarn.util.YarnTestUtils;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -58,13 +73,21 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
@@ -78,11 +101,13 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	private static final String LOG_DIR = "flink-yarn-tests-ha";
 	private static final Duration TIMEOUT = Duration.ofSeconds(200L);
-	private static final long RETRY_TIMEOUT = 100L;
 
 	private static TestingServer zkServer;
 	private static String storageDir;
 
+	private YarnTestJob.StopJobSignal stopJobSignal;
+	private JobGraph job;
+
 	@BeforeClass
 	public static void setup() throws Exception {
 		zkServer = new TestingServer();
@@ -104,6 +129,22 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		}
 	}
 
+	@Before
+	public void setUp() throws Exception {
+		initJobGraph();
+	}
+
+	private void initJobGraph() throws IOException {
+		stopJobSignal = YarnTestJob.StopJobSignal.usingMarkerFile(FOLDER.newFile().toPath());
+		job = YarnTestJob.stoppableJob(stopJobSignal);
+		final File testingJar =
+			YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+
+		assertThat(testingJar, notNullValue());
+
+		job.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+	}
+
 	/**
 	 * Tests that Yarn will restart a killed {@link YarnSessionClusterEntrypoint} which will then resume
 	 * a persisted {@link JobGraph}.
@@ -115,33 +156,107 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
 
 		final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
-
 		final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
 
-		final JobGraph job = createJobGraph();
+		try {
+			final JobID jobId = submitJob(restClusterClient);
+			final ApplicationId id = restClusterClient.getClusterId();
+
+			waitUntilJobIsRunning(restClusterClient, jobId);
 
-		final JobID jobId = submitJob(restClusterClient, job);
+			killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+			waitForApplicationAttempt(id, 2);
 
-		final ApplicationId id = restClusterClient.getClusterId();
+			waitForJobTermination(restClusterClient, jobId);
 
-		waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+			killApplicationAndWait(id);
+		} finally {
+			restClusterClient.shutdown();
+		}
+	}
+
+	@Test
+	public void testJobRecoversAfterKillingTaskManager() throws Exception {
+		final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
+		final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
+		try {
+			final JobID jobId = submitJob(restClusterClient);
+			waitUntilJobIsRunning(restClusterClient, jobId);
+
+			stopTaskManagerContainer();
+			waitUntilJobIsRestarted(restClusterClient, jobId, 1);
 
-		killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+			waitForJobTermination(restClusterClient, jobId);
 
+			killApplicationAndWait(restClusterClient.getClusterId());
+		} finally {
+			restClusterClient.shutdown();
+		}
+	}
+
+	private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId) throws Exception {
 		final YarnClient yarnClient = getYarnClient();
-		Assert.assertNotNull(yarnClient);
+		checkState(yarnClient != null, "yarnClient must be initialized");
 
-		while (yarnClient.getApplicationReport(id).getCurrentApplicationAttemptId().getAttemptId() < 2) {
-			Thread.sleep(RETRY_TIMEOUT);
+		waitUntilCondition(() -> {
+			final ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);
+			return applicationReport.getCurrentApplicationAttemptId().getAttemptId() >= attemptId;
+		}, Deadline.fromNow(TIMEOUT));
+	}
+
+	/**
+	 * Stops a container running {@link YarnTaskExecutorRunner}.
+	 */
+	private void stopTaskManagerContainer() throws Exception {
+		// find container id of taskManager:
+		ContainerId taskManagerContainer = null;
+		NodeManager nodeManager = null;
+		NMTokenIdentifier nmIdent = null;
+		UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+
+		for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+			NodeManager nm = yarnCluster.getNodeManager(nmId);
+			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
+			for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
+				String command = StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " ");
+				if (command.contains(YarnTaskExecutorRunner.class.getSimpleName())) {
+					taskManagerContainer = entry.getKey();
+					nodeManager = nm;
+					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
+					// allow myself to do stuff with the container
+					// remoteUgi.addCredentials(entry.getValue().getCredentials());
+					remoteUgi.addTokenIdentifier(nmIdent);
+				}
+			}
 		}
 
-		waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+		assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
+		assertNotNull("Illegal state", nodeManager);
+
+		StopContainersRequest scr = StopContainersRequest.newInstance(Collections.singletonList(taskManagerContainer));
+
+		nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+
+		// cleanup auth for the subsequent tests.
+		remoteUgi.getTokenIdentifiers().remove(nmIdent);
+	}
+
+	private void killApplicationAndWait(final ApplicationId id) throws Exception {
+		final YarnClient yarnClient = getYarnClient();
+		checkState(yarnClient != null, "yarnClient must be initialized");
 
 		yarnClient.killApplication(id);
 
-		while (yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty()) {
-			Thread.sleep(RETRY_TIMEOUT);
-		}
+		waitUntilCondition(() -> !yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty(),
+			Deadline.fromNow(TIMEOUT));
+	}
+
+	private void waitForJobTermination(
+			final RestClusterClient<ApplicationId> restClusterClient,
+			final JobID jobId) throws Exception {
+		stopJobSignal.signal();
+		final CompletableFuture<JobResult> jobResult = restClusterClient.requestJobResult(jobId);
+		jobResult.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
 	}
 
 	@Nonnull
@@ -153,6 +268,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
 		flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
 
+		flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
+		flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+
 		final int minMemory = 100;
 		flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
 
@@ -172,8 +290,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		return (RestClusterClient<ApplicationId>) yarnClusterClient;
 	}
 
-	private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient, JobGraph job) throws InterruptedException, java.util.concurrent.ExecutionException {
-		final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture = restClusterClient.submitJob(job);
+	private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient) throws InterruptedException, java.util.concurrent.ExecutionException {
+		final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture =
+			restClusterClient.submitJob(job);
 
 		final JobSubmissionResult jobSubmissionResult = jobSubmissionResultCompletableFuture.get();
 		return jobSubmissionResult.getJobID();
@@ -184,38 +303,61 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		assertThat(exec.waitFor(), is(0));
 	}
 
-	@Nonnull
-	private JobGraph createJobGraph() {
-		final JobGraph job = YarnTestJob.createJob();
-		final File testingJar =
-			YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+	private static void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobId) throws Exception {
+		waitUntilCondition(
+			() -> {
+				final JobDetailsInfo jobDetails = restClusterClient.getJobDetails(jobId).get();
+				return jobDetails.getJobVertexInfos()
+					.stream()
+					.map(toExecutionState())
+					.allMatch(isRunning());
+			},
+			Deadline.fromNow(TIMEOUT));
+	}
 
-		assertThat(testingJar, notNullValue());
+	private static Function<JobDetailsInfo.JobVertexDetailsInfo, ExecutionState> toExecutionState() {
+		return JobDetailsInfo.JobVertexDetailsInfo::getExecutionState;
+	}
 
-		job.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
-		return job;
+	private static Predicate<ExecutionState> isRunning() {
+		return executionState -> executionState == ExecutionState.RUNNING;
 	}
 
-	private void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobId, long retryTimeout) throws Exception {
+	private static void waitUntilJobIsRestarted(
+		final RestClusterClient<ApplicationId> restClusterClient,
+		final JobID jobId,
+		final int expectedFullRestarts) throws Exception {
 		waitUntilCondition(
-			() -> {
-				final Collection<JobStatusMessage> jobStatusMessages = restClusterClient.listJobs().get();
+			() -> getJobFullRestarts(restClusterClient, jobId) >= expectedFullRestarts,
+			Deadline.fromNow(TIMEOUT));
+	}
 
-				return jobStatusMessages.stream()
-					.filter(jobStatusMessage -> jobStatusMessage.getJobId().equals(jobId))
-					.anyMatch(jobStatusMessage -> jobStatusMessage.getJobState() == JobStatus.RUNNING);
-			},
-			Deadline.fromNow(TIMEOUT),
-			retryTimeout);
+	private static int getJobFullRestarts(
+		final RestClusterClient<ApplicationId> restClusterClient,
+		final JobID jobId) throws Exception {
+
+		return getJobMetric(restClusterClient, jobId, "fullRestarts")
+			.map(Metric::getValue)
+			.map(Integer::parseInt)
+			.orElse(0);
 	}
 
-	private void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryTimeout) throws Exception {
-		while (timeout.hasTimeLeft() && !condition.get()) {
-			Thread.sleep(Math.min(retryTimeout, timeout.timeLeft().toMillis()));
-		}
+	private static Optional<Metric> getJobMetric(
+		final RestClusterClient<ApplicationId> restClusterClient,
+		final JobID jobId,
+		final String metricName) throws Exception {
 
-		if (!timeout.hasTimeLeft()) {
-			throw new TimeoutException("Condition was not met in given timeout.");
-		}
+		final JobMetricsMessageParameters messageParameters = new JobMetricsMessageParameters();
+		messageParameters.jobPathParameter.resolve(jobId);
+		messageParameters.metricsFilterParameter.resolveFromString(metricName);
+
+		final Collection<Metric> metrics = restClusterClient.sendRequest(
+			JobMetricsHeaders.getInstance(),
+			messageParameters,
+			EmptyRequestBody.getInstance()).get().getMetrics();
+
+		final Metric metric = Iterables.getOnlyElement(metrics, null);
+		checkState(metric == null || metric.getId().equals(metricName));
+		return Optional.ofNullable(metric);
 	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 83abd8a..93b4c9d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,40 +18,43 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.guava18.com.google.common.net.HostAndPort;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -63,23 +66,31 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
 import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
-import static org.junit.Assume.assumeTrue;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * This test starts a MiniYARNCluster with a CapacityScheduler.
@@ -88,14 +99,46 @@ import static org.junit.Assume.assumeTrue;
 public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
 
+	/**
+	 * RestClient to query Flink cluster.
+	 */
+	private static RestClient restClient;
+
+	/**
+	 * ExecutorService for {@link RestClient}.
+	 * @see #restClient
+	 */
+	private static ExecutorService restClientExecutor;
+
+	/** Toggles checking for prohibited strings in logs after the test has run. */
+	private boolean checkForProhibitedLogContents = true;
+
 	@BeforeClass
-	public static void setup() {
+	public static void setup() throws Exception {
 		YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
 		YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
 		YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
 		YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
 		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
 		startYARNWithConfig(YARN_CONFIGURATION);
+
+		restClientExecutor = Executors.newSingleThreadExecutor();
+		restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), restClientExecutor);
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		try {
+			YarnTestBase.teardown();
+		} finally {
+			if (restClient != null) {
+				restClient.shutdown(Time.seconds(5));
+			}
+
+			if (restClientExecutor != null) {
+				restClientExecutor.shutdownNow();
+			}
+		}
 	}
 
 	/**
@@ -184,14 +227,23 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	}
 
 	/**
-	 * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
+	 * Starts a session cluster on YARN, and submits a streaming job.
+	 *
+	 * <p>Tests
+	 * <ul>
+	 * <li>if a custom YARN application name can be set from the command line,
+	 * <li>if the number of TaskManager slots can be set from the command line,
+	 * <li>if dynamic properties from the command line are set,
+	 * <li>if the vcores are set correctly (FLINK-2213),
+	 * <li>if jobmanager hostname/port are shown in web interface (FLINK-1902)
+	 * </ul>
+	 *
+	 * <p><b>Hint: </b> If you think it is a good idea to add more assertions to this test, think again!
 	 */
-	@Test(timeout = 100000) // timeout after 100 seconds
-	public void testTaskManagerFailure() throws Exception {
-		assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
-		LOG.info("Starting testTaskManagerFailure()");
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
+	@Test(timeout = 100_000)
+	public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception {
+		checkForProhibitedLogContents = false;
+		final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-jm", "768m",
 				"-tm", "1024m",
 				"-s", "3", // set the slots 3 to check if the vCores are set properly!
@@ -199,163 +251,134 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-Dfancy-configuration-value=veryFancy",
 				"-Dyarn.maximum-failed-containers=3",
 				"-D" + YarnConfigOptions.VCORES.key() + "=2"},
-			"Number of connected TaskManagers changed to 1. Slots available: 3",
+			"Flink JobManager is now running on ",
 			RunTypes.YARN_SESSION);
 
-		Assert.assertEquals(2, getRunningContainers());
-
-		// ------------------------ Test if JobManager web interface is accessible -------
-
-		final YarnClient yc = YarnClient.createYarnClient();
-		yc.init(YARN_CONFIGURATION);
-		yc.start();
-
-		List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-		Assert.assertEquals(1, apps.size()); // Only one running
-		ApplicationReport app = apps.get(0);
-		Assert.assertEquals("customName", app.getName());
-		String url = app.getTrackingUrl();
-		if (!url.endsWith("/")) {
-			url += "/";
-		}
-		if (!url.startsWith("http://")) {
-			url = "http://" + url;
-		}
-		LOG.info("Got application URL from YARN {}", url);
-
-		String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
-
-		JsonNode parsedTMs = new ObjectMapper().readTree(response);
-		ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
-		Assert.assertNotNull(taskManagers);
-		Assert.assertEquals(1, taskManagers.size());
-		Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
-
-		// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
-		String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
-		Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
-
-		Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
-		Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
-		Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
+		final String logs = outContent.toString();
+		final HostAndPort hostAndPort = parseJobManagerHostname(logs);
+		final String host = hostAndPort.getHostText();
+		final int port = hostAndPort.getPort();
+		LOG.info("Extracted hostname:port: {}", host, port);
+
+		submitJob("WindowJoin.jar");
+
+		//
+		// Assert that custom YARN application name "customName" is set
+		//
+		final ApplicationReport applicationReport = getOnlyApplicationReport();
+		assertEquals("customName", applicationReport.getName());
+
+		//
+		// Assert the number of TaskManager slots are set
+		//
+		waitForTaskManagerRegistration(host, port, Duration.ofMillis(30_000));
+		assertNumberOfSlotsPerTask(host, port, 3);
+
+		final Map<String, String> flinkConfig = getFlinkConfig(host, port);
+
+		//
+		// Assert dynamic properties
+		//
+		assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
+		assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3"));
+
+		//
+		// FLINK-2213: assert that vcores are set
+		//
+		assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2"));
+
+		//
+		// FLINK-1902: check if jobmanager hostname is shown in web interface
+		//
+		assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), host));
+
+		yarnSessionClusterRunner.sendStop();
+		yarnSessionClusterRunner.join();
+	}
 
-		// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
-		// first, get the hostname/port
-		String oC = outContent.toString();
-		Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
-		Matcher matches = p.matcher(oC);
+	private static HostAndPort parseJobManagerHostname(final String logs) {
+		final Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
+		final Matcher matches = p.matcher(logs);
 		String hostname = null;
 		String port = null;
+
 		while (matches.find()) {
 			hostname = matches.group(1).toLowerCase();
 			port = matches.group(2);
 		}
-		LOG.info("Extracted hostname:port: {} {}", hostname, port);
-
-		Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
-			parsedConfig.get(JobManagerOptions.ADDRESS.key()));
-		Assert.assertEquals("unable to find port in " + jsonConfig, port,
-			parsedConfig.get(JobManagerOptions.PORT.key()));
-
-		// test logfile access
-		String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
-		Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
-		Assert.assertTrue(logs.contains("Starting JobManager"));
-		Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
-
-		// ------------------------ Kill container with TaskManager and check if vcores are set correctly -------
-
-		// find container id of taskManager:
-		ContainerId taskManagerContainer = null;
-		NodeManager nodeManager = null;
-		UserGroupInformation remoteUgi = null;
-		NMTokenIdentifier nmIdent = null;
-		try {
-			remoteUgi = UserGroupInformation.getCurrentUser();
-		} catch (IOException e) {
-			LOG.warn("Unable to get curr user", e);
-			Assert.fail();
-		}
-		for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-			NodeManager nm = yarnCluster.getNodeManager(nmId);
-			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
-			for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
-				String command = StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " ");
-				if (command.contains(YarnTaskManager.class.getSimpleName())) {
-					taskManagerContainer = entry.getKey();
-					nodeManager = nm;
-					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
-					// allow myself to do stuff with the container
-					// remoteUgi.addCredentials(entry.getValue().getCredentials());
-					remoteUgi.addTokenIdentifier(nmIdent);
-				}
-			}
-			sleep(500);
-		}
 
-		Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
-		Assert.assertNotNull("Illegal state", nodeManager);
+		checkState(hostname != null, "hostname not found in log");
+		checkState(port != null, "port not found in log");
 
-		yc.stop();
+		return HostAndPort.fromParts(hostname, Integer.parseInt(port));
+	}
 
-		List<ContainerId> toStop = new LinkedList<ContainerId>();
-		toStop.add(taskManagerContainer);
-		StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
+	private ApplicationReport getOnlyApplicationReport() throws IOException, YarnException {
+		final YarnClient yarnClient = getYarnClient();
+		checkState(yarnClient != null);
 
-		try {
-			nodeManager.getNMContext().getContainerManager().stopContainers(scr);
-		} catch (Throwable e) {
-			LOG.warn("Error stopping container", e);
-			Assert.fail("Error stopping container: " + e.getMessage());
-		}
+		final List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+		assertEquals(1, apps.size()); // Only one running
+		return apps.get(0);
+	}
 
-		// stateful termination check:
-		// wait until we saw a container being killed and AFTERWARDS a new one launched
-		boolean ok = false;
-		do {
-			LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
-
-			String o = errContent.toString();
-			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
-			if (killedOff != -1) {
-				o = o.substring(killedOff);
-				ok = o.indexOf("Launching TaskManager") > 0;
-			}
-			sleep(1000);
-		} while(!ok);
+	private void submitJob(final String jobFileName) throws IOException, InterruptedException {
+		Runner jobRunner = startWithArgs(new String[]{"run",
+				"--detached", getTestJarPath(jobFileName).getAbsolutePath()},
+			"Job has been submitted with JobID", RunTypes.CLI_FRONTEND);
+		jobRunner.join();
+	}
+
+	private static void waitForTaskManagerRegistration(
+			final String host,
+			final int port,
+			final Duration waitDuration) throws Exception {
+		waitUntilCondition(() -> getNumberOfTaskManagers(host, port) > 0, Deadline.fromNow(waitDuration));
+	}
 
-		// send "stop" command to command line interface
-		runner.sendStop();
-		// wait for the thread to stop
+	private static void assertNumberOfSlotsPerTask(
+			final String host,
+			final int port,
+			final int slotsNumber) throws Exception {
 		try {
-			runner.join();
-		} catch (InterruptedException e) {
-			LOG.warn("Interrupted while stopping runner", e);
+			waitUntilCondition(() -> getNumberOfSlotsPerTaskManager(host, port) == slotsNumber, Deadline.fromNow(Duration.ofSeconds(30)));
+		} catch (final TimeoutException e) {
+			final int currentNumberOfSlots = getNumberOfSlotsPerTaskManager(host, port);
+			fail(String.format("Expected slots per TM to be %d, was: %d", slotsNumber, currentNumberOfSlots));
 		}
-		LOG.warn("stopped");
-
-		// ----------- Send output to logger
-		System.setOut(ORIGINAL_STDOUT);
-		System.setErr(ORIGINAL_STDERR);
-		oC = outContent.toString();
-		String eC = errContent.toString();
-		LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
-		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
+	}
 
-		// ------ Check if everything happened correctly
-		Assert.assertTrue("Expect to see failed container",
-			eC.contains("New messages from the YARN cluster"));
+	private static int getNumberOfTaskManagers(final String host, final int port) throws Exception {
+		final ClusterOverviewWithVersion clusterOverviewWithVersion = restClient.sendRequest(
+			host,
+			port,
+			ClusterOverviewHeaders.getInstance()).get(30_000, TimeUnit.MILLISECONDS);
 
-		Assert.assertTrue("Expect to see failed container",
-			eC.contains("Container killed by the ApplicationMaster"));
+		return clusterOverviewWithVersion.getNumTaskManagersConnected();
+	}
 
-		Assert.assertTrue("Expect to see new container started",
-			eC.contains("Launching TaskManager") && eC.contains("on host"));
+	private static int getNumberOfSlotsPerTaskManager(final String host, final int port) throws Exception {
+		final TaskManagersInfo taskManagersInfo = restClient.sendRequest(
+			host,
+			port,
+			TaskManagersHeaders.getInstance()).get();
+
+		return taskManagersInfo.getTaskManagerInfos()
+			.stream()
+			.map(TaskManagerInfo::getNumberSlots)
+			.findFirst()
+			.orElse(0);
+	}
 
-		// cleanup auth for the subsequent tests.
-		remoteUgi.getTokenIdentifiers().remove(nmIdent);
+	private static Map<String, String> getFlinkConfig(final String host, final int port) throws Exception {
+		final ClusterConfigurationInfo clusterConfigurationInfoEntries = restClient.sendRequest(
+			host,
+			port,
+			ClusterConfigurationInfoHeaders.getInstance()).get();
 
-		LOG.info("Finished testTaskManagerFailure()");
+		return clusterConfigurationInfoEntries.stream().collect(Collectors.toMap(
+			ClusterConfigurationInfoEntry::getKey,
+			ClusterConfigurationInfoEntry::getValue));
 	}
 
 	/**
@@ -620,11 +643,13 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		@SuppressWarnings("unchecked")
 		Set<String> applicationTags = (Set<String>) applicationTagsMethod.invoke(report);
 
-		Assert.assertEquals(Collections.singleton("test-tag"), applicationTags);
+		assertEquals(Collections.singleton("test-tag"), applicationTags);
 	}
 
 	@After
 	public void checkForProhibitedLogContents() {
-		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
+		if (checkForProhibitedLogContents) {
+			ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
+		}
 	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index b7b08ae..242588f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.commons.io.FileUtils;
@@ -81,6 +82,7 @@ import java.util.Scanner;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -104,6 +106,8 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected static final int NUM_NODEMANAGERS = 2;
 
+	private static final long RETRY_TIMEOUT = 100L;
+
 	/** The tests are scanning for these strings in the final output. */
 	protected static final String[] PROHIBITED_STRINGS = {
 			"Exception", // we don't want any exceptions to happen
@@ -122,7 +126,11 @@ public abstract class YarnTestBase extends TestLogger {
 		"java.io.IOException: Connection reset by peer",
 
 		// this can happen in Akka 2.4 on shutdown.
-		"java.util.concurrent.RejectedExecutionException: Worker has already been shutdown"
+		"java.util.concurrent.RejectedExecutionException: Worker has already been shutdown",
+
+		"org.apache.flink.util.FlinkException: Stopping JobMaster",
+		"org.apache.flink.util.FlinkException: JobManager is shutting down.",
+		"lost the leadership."
 	};
 
 	// Temp directory which is deleted after the unit test.
@@ -271,6 +279,16 @@ public abstract class YarnTestBase extends TestLogger {
 		return null;
 	}
 
+	protected static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout) throws Exception {
+		while (timeout.hasTimeLeft() && !condition.get()) {
+			Thread.sleep(Math.min(RETRY_TIMEOUT, timeout.timeLeft().toMillis()));
+		}
+
+		if (!timeout.hasTimeLeft()) {
+			throw new TimeoutException("Condition was not met in given timeout.");
+		}
+	}
+
 	@Nonnull
 	YarnClusterDescriptor createYarnClusterDescriptor(org.apache.flink.configuration.Configuration flinkConfiguration) {
 		final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
index f86d6e4..59bad7e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
@@ -23,6 +23,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
 /**
  * Testing job for {@link org.apache.flink.runtime.jobmaster.JobMaster} failover.
  * Covering stream case that have a infinite source and a sink, scheduling by
@@ -30,10 +36,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
  */
 public class YarnTestJob {
 
-	public static JobGraph createJob() {
+	public static JobGraph stoppableJob(final StopJobSignal stopJobSignal) {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		env.addSource(new InfiniteSourceFunction())
+		env.addSource(new InfiniteSourceFunction(stopJobSignal))
 			.setParallelism(2)
 			.shuffle()
 			.addSink(new DiscardingSink<>())
@@ -42,21 +48,61 @@ public class YarnTestJob {
 		return env.getStreamGraph().getJobGraph();
 	}
 
+	/**
+	 * Helper class to signal between multiple processes that a job should stop.
+	 */
+	public static class StopJobSignal implements Serializable {
+
+		private final String stopJobMarkerFile;
+
+		public static StopJobSignal usingMarkerFile(final Path stopJobMarkerFile) {
+			return new StopJobSignal(stopJobMarkerFile.toString());
+		}
+
+		private StopJobSignal(final String stopJobMarkerFile) {
+			this.stopJobMarkerFile = stopJobMarkerFile;
+		}
+
+		/**
+		 * Signals that the job should stop.
+		 */
+		public void signal() {
+			try {
+				Files.delete(Paths.get(stopJobMarkerFile));
+			} catch (final IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		/**
+		 * True if job should stop.
+		 */
+		public boolean isSignaled() {
+			return !Files.exists(Paths.get(stopJobMarkerFile));
+		}
+
+	}
+
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
 
 	private static final class InfiniteSourceFunction extends RichParallelSourceFunction<Integer> {
+
 		private static final long serialVersionUID = -8758033916372648233L;
+
 		private boolean running;
 
-		InfiniteSourceFunction() {
-			running = true;
+		private final StopJobSignal stopJobSignal;
+
+		InfiniteSourceFunction(final StopJobSignal stopJobSignal) {
+			this.running = true;
+			this.stopJobSignal = stopJobSignal;
 		}
 
 		@Override
 		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (running) {
+			while (running && !stopJobSignal.isSignaled()) {
 				synchronized (ctx.getCheckpointLock()) {
 					ctx.collect(0);
 				}