You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2020/08/19 23:32:43 UTC

[flink] branch master updated: [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dfb8a3b  [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
dfb8a3b is described below

commit dfb8a3be7f0d113032a28cf6a1b296725e5562f5
Author: tison <wa...@gmail.com>
AuthorDate: Sat Aug 15 08:29:49 2020 +0800

    [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
    
    This closes #11469 .
---
 .../java/org/apache/flink/client/ClientUtils.java  |  52 -----
 .../apache/flink/client/program/ClientTest.java    |   6 +-
 .../client/program/rest/RestClusterClientTest.java | 243 +++++++++------------
 .../connectors/kafka/KafkaConsumerTestBase.java    |  12 +-
 .../flink/state/api/SavepointReaderITTestBase.java |   6 +-
 .../flink/state/api/SavepointWriterITCase.java     |  22 +-
 .../flink/state/api/utils/SavepointTestBase.java   |   6 +-
 .../jobmanager/JMXJobManagerMetricTest.java        |   3 +-
 .../itcases/AbstractQueryableStateTestBase.java    |  44 ++--
 .../runtime/webmonitor/WebFrontendITCase.java      |   5 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   3 +-
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   3 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |   5 +-
 .../flink/test/cancelling/CancelingTestBase.java   |  17 +-
 .../NotifyCheckpointAbortedITCase.java             |   3 +-
 .../test/checkpointing/RegionFailoverITCase.java   |   4 +-
 .../flink/test/checkpointing/RescalingITCase.java  |  18 +-
 .../ResumeCheckpointManuallyITCase.java            |   3 +-
 .../flink/test/checkpointing/SavepointITCase.java  |  20 +-
 .../StreamFaultToleranceTestBase.java              |  23 +-
 .../ZooKeeperHighAvailabilityITCase.java           |   3 +-
 .../utils/SavepointMigrationTestBase.java          |  17 +-
 .../test/example/client/JobRetrievalITCase.java    |   3 +-
 .../example/failing/JobSubmissionFailsITCase.java  |  10 +-
 .../test/runtime/NetworkStackThroughputITCase.java |  36 +--
 .../restore/AbstractOperatorRestoreTestBase.java   |   5 +-
 .../runtime/BigUserProgramJobSubmitITCase.java     |   4 +-
 .../java/org/apache/flink/test/util/TestUtils.java |   9 +
 28 files changed, 238 insertions(+), 347 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index c94dbc2..d7b069f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.client;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -27,22 +25,15 @@ import org.apache.flink.client.program.StreamContextEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -82,49 +73,6 @@ public enum ClientUtils {
 			checkClassloaderLeak);
 	}
 
-	public static JobExecutionResult submitJob(
-			ClusterClient<?> client,
-			JobGraph jobGraph) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
-		try {
-			return client
-				.submitJob(jobGraph)
-				.thenApply(DetachedJobExecutionResult::new)
-				.get();
-		} catch (InterruptedException | ExecutionException e) {
-			ExceptionUtils.checkInterrupted(e);
-			throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
-		}
-	}
-
-	public static JobExecutionResult submitJobAndWaitForResult(
-			ClusterClient<?> client,
-			JobGraph jobGraph,
-			ClassLoader classLoader) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
-		checkNotNull(classLoader);
-
-		JobResult jobResult;
-
-		try {
-			jobResult = client
-				.submitJob(jobGraph)
-				.thenCompose(client::requestJobResult)
-				.get();
-		} catch (InterruptedException | ExecutionException e) {
-			ExceptionUtils.checkInterrupted(e);
-			throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
-		}
-
-		try {
-			return jobResult.toJobExecutionResult(classLoader);
-		} catch (JobExecutionException | IOException | ClassNotFoundException e) {
-			throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
-		}
-	}
-
 	public static void executeProgram(
 			PipelineExecutorServiceLoader executorServiceLoader,
 			Configuration configuration,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 508e3a6..22b78c8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -227,8 +226,7 @@ public class ClientTest extends TestLogger {
 		jobGraph.addJars(Collections.emptyList());
 		jobGraph.setClasspaths(Collections.emptyList());
 
-		JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
-		assertNotNull(result);
+		assertNotNull(clusterClient.submitJob(jobGraph).get());
 	}
 
 	/**
@@ -430,7 +428,7 @@ public class ClientTest extends TestLogger {
 						jobGraph.addJars(accessor.getJars());
 						jobGraph.setClasspaths(accessor.getClasspaths());
 
-						final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+						final JobID jobID = clusterClient.submitJob(jobGraph).get();
 						return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
 					};
 				}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 81e845e..fbd8b2c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -21,19 +21,15 @@ package org.apache.flink.client.program.rest;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -134,7 +130,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -228,22 +223,12 @@ public class RestClusterClientTest extends TestLogger {
 	public void testJobSubmitCancel() throws Exception {
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler();
-		TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.SUCCEEDED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.build()));
 
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			submitHandler,
-			terminationHandler,
-			testJobExecutionResultHandler)) {
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(submitHandler, terminationHandler)) {
 
 			try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 				Assert.assertFalse(submitHandler.jobSubmitted);
-				ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
+				restClusterClient.submitJob(jobGraph).get();
 				Assert.assertTrue(submitHandler.jobSubmitted);
 
 				Assert.assertFalse(terminationHandler.jobCanceled);
@@ -253,32 +238,6 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that we can submit a jobGraph in detached mode.
-	 */
-	@Test
-	public void testDetachedJobSubmission() throws Exception {
-
-		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
-
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			testJobSubmitHandler)) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
-			try {
-				final JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(restClusterClient, jobGraph);
-
-				// if the detached mode didn't work, then we would not reach this point because the execution result
-				// retrieval would have failed.
-				assertThat(jobSubmissionResult, is(instanceOf(DetachedJobExecutionResult.class)));
-				assertThat(jobSubmissionResult.getJobID(), is(jobId));
-			} finally {
-				restClusterClient.close();
-			}
-		}
-
-	}
-
 	private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 		private volatile boolean jobSubmitted = false;
 
@@ -307,94 +266,6 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	private class TestJobExecutionResultHandler
-		extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
-
-		private final Iterator<Object> jobExecutionResults;
-
-		private Object lastJobExecutionResult;
-
-		private TestJobExecutionResultHandler(
-				final Object... jobExecutionResults) {
-			super(JobExecutionResultHeaders.getInstance());
-			checkArgument(Arrays.stream(jobExecutionResults)
-				.allMatch(object -> object instanceof JobExecutionResultResponseBody
-					|| object instanceof RestHandlerException));
-			this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
-		}
-
-		@Override
-		protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
-				@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
-				@Nonnull DispatcherGateway gateway) throws RestHandlerException {
-			if (jobExecutionResults.hasNext()) {
-				lastJobExecutionResult = jobExecutionResults.next();
-			}
-			checkState(lastJobExecutionResult != null);
-			if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
-				return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
-			} else if (lastJobExecutionResult instanceof RestHandlerException) {
-				return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
-			} else {
-				throw new AssertionError();
-			}
-		}
-	}
-
-	@Test
-	public void testSubmitJobAndWaitForExecutionResult() throws Exception {
-		final TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(
-				new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
-				JobExecutionResultResponseBody.inProgress(),
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.SUCCEEDED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
-					.build()),
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.FAILED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
-					.build()));
-
-		// fail first HTTP polling attempt, which should not be a problem because of the retries
-		final AtomicBoolean firstPollFailed = new AtomicBoolean();
-		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
-			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
-
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			testJobExecutionResultHandler,
-			new TestJobSubmitHandler())) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
-			try {
-				JobExecutionResult jobExecutionResult;
-
-				jobExecutionResult = ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-				assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-				assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
-				assertThat(
-					jobExecutionResult.getAllAccumulatorResults(),
-					equalTo(Collections.singletonMap("testName", 1.0)));
-
-				try {
-					ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-					fail("Expected exception not thrown.");
-				} catch (final ProgramInvocationException e) {
-					final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
-
-					assertThat(cause.isPresent(), is(true));
-					assertThat(cause.get().getMessage(), equalTo("expected"));
-				}
-			} finally {
-				restClusterClient.close();
-			}
-		}
-	}
-
 	@Test
 	public void testDisposeSavepoint() throws Exception {
 		final String savepointPath = "foobar";
@@ -596,21 +467,109 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	private class TestJobExecutionResultHandler extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
+
+		private final Iterator<Object> jobExecutionResults;
+
+		private Object lastJobExecutionResult;
+
+		private TestJobExecutionResultHandler(
+			final Object... jobExecutionResults) {
+			super(JobExecutionResultHeaders.getInstance());
+			checkArgument(Arrays.stream(jobExecutionResults)
+				.allMatch(object -> object instanceof JobExecutionResultResponseBody
+					|| object instanceof RestHandlerException));
+			this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
+		}
+
+		@Override
+		protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+			@Nonnull DispatcherGateway gateway) {
+			if (jobExecutionResults.hasNext()) {
+				lastJobExecutionResult = jobExecutionResults.next();
+			}
+			checkState(lastJobExecutionResult != null);
+			if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
+				return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
+			} else if (lastJobExecutionResult instanceof RestHandlerException) {
+				return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
+			} else {
+				throw new AssertionError();
+			}
+		}
+	}
+
 	@Test
-	public void testJobSubmissionFailureThrowsProgramInvocationException() throws Exception {
-		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler())) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+	public void testSubmitJobAndWaitForExecutionResult() throws Exception {
+		final TestJobExecutionResultHandler testJobExecutionResultHandler =
+			new TestJobExecutionResultHandler(
+				new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
+				JobExecutionResultResponseBody.inProgress(),
+				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.SUCCEEDED)
+					.jobId(jobId)
+					.netRuntime(Long.MAX_VALUE)
+					.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
+					.build()),
+				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.FAILED)
+					.jobId(jobId)
+					.netRuntime(Long.MAX_VALUE)
+					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
+					.build()));
 
-			try {
-				ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-			} catch (final ProgramInvocationException expected) {
-				// expected
-			} finally {
-				restClusterClient.close();
+		// fail first HTTP polling attempt, which should not be a problem because of the retries
+		final AtomicBoolean firstPollFailed = new AtomicBoolean();
+		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
+
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
+			testJobExecutionResultHandler,
+			new TestJobSubmitHandler())) {
+
+			try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+				final JobExecutionResult jobExecutionResult = restClusterClient.submitJob(jobGraph)
+					.thenCompose(restClusterClient::requestJobResult)
+					.get()
+					.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+				assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
+				assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
+				assertThat(
+					jobExecutionResult.getAllAccumulatorResults(),
+					equalTo(Collections.singletonMap("testName", 1.0)));
+
+				try {
+					restClusterClient.submitJob(jobGraph)
+						.thenCompose(restClusterClient::requestJobResult)
+						.get()
+						.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+					fail("Expected exception not thrown.");
+				} catch (final Exception e) {
+					final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+					assertThat(cause.isPresent(), is(true));
+					assertThat(cause.get().getMessage(), equalTo("expected"));
+				}
 			}
 		}
 	}
 
+	@Test
+	public void testJobSubmissionFailureCauseForwardedToClient() throws Exception {
+		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler())) {
+			try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+				restClusterClient.submitJob(jobGraph)
+					.thenCompose(restClusterClient::requestJobResult)
+					.get()
+					.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+			} catch (final Exception e) {
+				assertTrue(ExceptionUtils.findThrowableWithMessage(e, "RestHandlerException: expected").isPresent());
+				return;
+			}
+			fail("Should failed with exception");
+		}
+	}
+
 	private final class SubmissionFailingHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 
 		private SubmissionFailingHandler() {
@@ -619,8 +578,8 @@ public class RestClusterClientTest extends TestLogger {
 
 		@Override
 		protected CompletableFuture<JobSubmitResponseBody> handleRequest(
-				@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
-				@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
+			@Nonnull DispatcherGateway gateway) throws RestHandlerException {
 			throw new RestHandlerException("expected", HttpResponseStatus.INTERNAL_SERVER_ERROR);
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8406f70..7f0f0ed 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -36,7 +36,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -115,6 +114,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -455,7 +455,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		Thread consumeThread = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 					error.set(t);
@@ -1001,7 +1001,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		final Runnable jobRunner = () -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				jobError.set(t);
 			}
@@ -1071,7 +1071,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		final Runnable jobRunner = () -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				LOG.error("Job Runner failed with exception", t);
 				error.set(t);
@@ -1657,7 +1657,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		Thread jobThread = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 					LOG.warn("Got exception during execution", t);
@@ -2134,7 +2134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		Thread runner = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 				tryExecute(readEnv, "sequence validation");
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
index f7e3a87..96414ba 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.state.api;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -172,7 +170,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
 		String dirPath = getTempDirPath(new AbstractID().toHexString());
 
 		try {
-			JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+			JobID jobID = client.submitJob(jobGraph).get();
 
 			boolean finished = false;
 			while (deadline.hasTimeLeft()) {
@@ -193,7 +191,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
 				Assert.fail("Failed to initialize state within deadline");
 			}
 
-			CompletableFuture<String> path = client.triggerSavepoint(result.getJobID(), dirPath);
+			CompletableFuture<String> path = client.triggerSavepoint(jobID, dirPath);
 			return path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		} finally {
 			client.cancel(jobId).get();
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index d87a33d..5dd3fcc 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -28,9 +28,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -51,6 +49,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,6 +62,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
@@ -147,7 +147,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Bootstrap");
 	}
 
-	private void validateBootstrap(String savepointPath) throws ProgramInvocationException {
+	private void validateBootstrap(String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);
 
@@ -170,7 +170,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
 
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
-		ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+		Optional<SerializedThrowable> serializedThrowable = client
+			.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.getSerializedThrowable();
+		Assert.assertFalse(serializedThrowable.isPresent());
 
 		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
 	}
@@ -193,7 +198,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Modifying");
 	}
 
-	private void validateModification(String savepointPath) throws ProgramInvocationException {
+	private void validateModification(String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);
 
@@ -216,7 +221,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
 
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
-		ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+		Optional<SerializedThrowable> serializedThrowable = client
+			.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.getSerializedThrowable();
+		Assert.assertFalse(serializedThrowable.isPresent());
 
 		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
index 0d25682..8e75058 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.state.api.utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,11 +54,11 @@ public abstract class SavepointTestBase extends AbstractTestBase {
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
 
 		try {
-			JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+			JobID jobID = client.submitJob(jobGraph).get();
 
 			return CompletableFuture
 				.runAsync(waitingSource::awaitSource)
-				.thenCompose(ignore -> triggerSavepoint(client, result.getJobID()))
+				.thenCompose(ignore -> triggerSavepoint(client, jobID))
 				.get(5, TimeUnit.MINUTES);
 		} catch (Exception e) {
 			throw new RuntimeException("Failed to take savepoint", e);
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index dacfd5a..cbd0a7a 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -108,7 +107,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
 				null));
 
 			ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			FutureUtils.retrySuccessfulWithDelay(
 				() -> client.getJobStatus(jobGraph.getJobID()),
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index d9e3c91..123d3d8 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -45,9 +45,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
@@ -57,6 +55,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -187,7 +186,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final AtomicLongArray counts = new AtomicLongArray(numKeys);
 
@@ -296,16 +295,15 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		// Submit the job graph
 		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		boolean caughtException = false;
-		try {
-			ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, AbstractQueryableStateTestBase.class.getClassLoader());
-		} catch (ProgramInvocationException e) {
-			String failureCause = ExceptionUtils.stringifyException(e);
-			assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
-			caughtException = true;
-		}
-
-		assertTrue(caughtException);
+		clusterClient.submitJob(jobGraph)
+			.thenCompose(clusterClient::requestJobResult)
+			.thenApply(JobResult::getSerializedThrowable)
+			.thenAccept(serializedThrowable -> {
+				assertTrue(serializedThrowable.isPresent());
+				final Throwable t = serializedThrowable.get().deserializeError(getClass().getClassLoader());
+				final String failureCause = ExceptionUtils.stringifyException(t);
+				assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
+			}).get();
 	}
 
 	/**
@@ -346,7 +344,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		}
 	}
@@ -380,7 +378,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(deadline, clusterClient, env)) {
 
-			ClientUtils.submitJob(clusterClient, closableJobGraph.getJobGraph());
+			clusterClient.submitJob(closableJobGraph.getJobGraph()).get();
 
 			CompletableFuture<JobStatus> jobStatusFuture =
 				clusterClient.getJobStatus(closableJobGraph.getJobId());
@@ -481,7 +479,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
 		}
 	}
@@ -528,7 +526,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			// Now query
 			int key = 0;
@@ -596,7 +594,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
 		}
 	}
@@ -640,7 +638,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
@@ -712,7 +710,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final long expected = numElements * (numElements + 1L) / 2L;
 
@@ -804,7 +802,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final long expected = numElements * (numElements + 1L) / 2L;
 
@@ -894,7 +892,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final Map<Integer, Set<Long>> results = new HashMap<>();
 
@@ -979,7 +977,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index be078a0..5522375 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -260,7 +259,7 @@ public class WebFrontendITCase extends TestLogger {
 		final JobID jid = jobGraph.getJobID();
 
 		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait for job to show up
 		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
@@ -318,7 +317,7 @@ public class WebFrontendITCase extends TestLogger {
 		final JobID jid = jobGraph.getJobID();
 
 		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait for job to show up
 		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 42c889c..e8d223e 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -289,7 +288,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 						0),
 				null));
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 8605623..b861956 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -115,7 +114,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
 				0),
 			null));
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 4e94e43..bcf2702 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -60,6 +59,8 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+
 /**
  * Tests the availability of accumulator results during runtime.
  */
@@ -145,7 +146,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 		final CheckedThread submissionThread = new CheckedThread() {
 			@Override
 			public void go() throws Exception {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			}
 		};
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03bc6eb..2551b9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.test.cancelling;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -97,14 +96,14 @@ public abstract class CancelingTestBase extends TestLogger {
 		final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
 
 		ClusterClient<?> client = CLUSTER.getClusterClient();
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
 		Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 
-		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+		JobStatus jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+			jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatus != JobStatus.RUNNING) {
 			Assert.fail("Job not in state RUNNING.");
@@ -112,17 +111,17 @@ public abstract class CancelingTestBase extends TestLogger {
 
 		Thread.sleep(msecsTillCanceling);
 
-		client.cancel(jobSubmissionResult.getJobID()).get();
+		client.cancel(jobID).get();
 
 		Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+		JobStatus jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+			jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatusAfterCancel != JobStatus.CANCELED) {
-			Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
+			Assert.fail("Failed to cancel job with ID " + jobID + '.');
 		}
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index e7e6e5c..9a6428f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -174,7 +173,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger {
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		JobID jobID = jobGraph.getJobID();
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		TestingCompletedCheckpointStore.addCheckpointLatch.await();
 		TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index d180675..3459e54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -77,6 +76,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -143,7 +143,7 @@ public class RegionFailoverITCase extends TestLogger {
 		try {
 			JobGraph jobGraph = createJobGraph();
 			ClusterClient<?> client = cluster.getClusterClient();
-			ClientUtils.submitJobAndWaitForResult(client, jobGraph, RegionFailoverITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			verifyAfterJobExecuted();
 		} catch (Exception e) {
 			e.printStackTrace();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index b011950..be58fd7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -79,6 +78,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -188,7 +188,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -226,7 +226,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -267,7 +267,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
@@ -287,7 +287,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 		} catch (JobExecutionException exception) {
 			if (exception.getCause() instanceof IllegalStateException) {
 				// we expect a IllegalStateException wrapped
@@ -332,7 +332,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -375,7 +375,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -454,7 +454,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
@@ -479,7 +479,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			int sumExp = 0;
 			int sumAct = 0;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index e39fce7..d9f4bf30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -295,7 +294,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 		JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
 		NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
 
-		ClientUtils.submitJob(client, initialJobGraph);
+		client.submitJob(initialJobGraph).get();
 
 		// wait until all sources have been started
 		NotifyingInfiniteTupleSource.countDownLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index abdc188..d70a409 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -91,6 +89,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -209,7 +208,7 @@ public class SavepointITCase extends TestLogger {
 		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			StatefulCounter.getProgressLatch().await();
 
@@ -253,7 +252,7 @@ public class SavepointITCase extends TestLogger {
 		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// Await state is restored
 			StatefulCounter.getRestoreLatch().await();
@@ -337,7 +336,7 @@ public class SavepointITCase extends TestLogger {
 		final JobGraph graph = new JobGraph(vertex);
 
 		try {
-			ClientUtils.submitJob(client, graph);
+			client.submitJob(graph).get();
 
 			client.triggerSavepoint(graph.getJobID(), null).get();
 
@@ -386,7 +385,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Exception e) {
 				Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
 				Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
@@ -452,8 +451,7 @@ public class SavepointITCase extends TestLogger {
 
 			JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
 
-			JobSubmissionResult submissionResult = ClientUtils.submitJob(client, originalJobGraph);
-			JobID jobID = submissionResult.getJobID();
+			JobID jobID = client.submitJob(originalJobGraph).get();
 
 			// wait for the Tasks to be ready
 			assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -502,7 +500,7 @@ public class SavepointITCase extends TestLogger {
 					"savepoint path " + savepointPath + " in detached mode.");
 
 			// Submit the job
-			ClientUtils.submitJob(client, modifiedJobGraph);
+			client.submitJob(modifiedJobGraph).get();
 			// Await state is restored
 			assertTrue(StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
@@ -696,7 +694,7 @@ public class SavepointITCase extends TestLogger {
 
 		String savepointPath = null;
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 			for (OneShotLatch latch : iterTestSnapshotWait) {
 				latch.await();
 			}
@@ -710,7 +708,7 @@ public class SavepointITCase extends TestLogger {
 			jobGraph = streamGraph.getJobGraph();
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index f6c0efb..4f500fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,8 +19,6 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +26,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -41,7 +40,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 
-import static org.junit.Assert.fail;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 
 /**
  * Test base for fault tolerant streaming programs.
@@ -126,21 +125,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 			try {
-				ClientUtils.submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader()).getJobExecutionResult();
-			} catch (ProgramInvocationException root) {
-				Throwable cause = root.getCause();
-
-				// search for nested SuccessExceptions
-				int depth = 0;
-				while (!(cause instanceof SuccessException)) {
-					if (cause == null || depth++ == 20) {
-						root.printStackTrace();
-						fail("Test failed: " + root.getMessage());
-					}
-					else {
-						cause = cause.getCause();
-					}
-				}
+				submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
+			} catch (Exception e) {
+				Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
 			}
 
 			postSubmit();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 81df28f..4bd4754 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -209,7 +208,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait until we did some checkpoints
 		waitForCheckpointLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1e7f619..7787a18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.test.checkpointing.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -130,14 +128,14 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 		// Submit the job
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
-		LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+		LOG.info("Submitted job {} and waiting...", jobID);
 
 		boolean done = false;
 		while (deadLine.hasTimeLeft()) {
 			Thread.sleep(100);
-			Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobID).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
@@ -165,7 +163,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 		LOG.info("Triggering savepoint.");
 
-		CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
+		CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
 
 		String jobmanagerSavepointPath = savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
@@ -193,17 +191,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
 		boolean done = false;
 		while (deadLine.hasTimeLeft()) {
 
 			// try and get a job result, this will fail if the job already failed. Use this
 			// to get out of this loop
-			JobID jobId = jobSubmissionResult.getJobID();
 
 			try {
-				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
+				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobID);
 
 				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
 
@@ -213,7 +210,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 			}
 
 			Thread.sleep(100);
-			Map<String, Object> accumulators = client.getAccumulators(jobId).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobID).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 063a9cd..e0611a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -20,7 +20,6 @@
 package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
@@ -95,7 +94,7 @@ public class JobRetrievalITCase extends TestLogger {
 		// has been attached in resumingThread
 		lock.acquire();
 
-		ClientUtils.submitJob(client, jobGraph);
+		client.submitJob(jobGraph).get();
 
 		final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
 			@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index f19080f..5723978 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.test.example.failing;
 
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
@@ -45,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.function.Predicate;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.fail;
 
 /**
@@ -127,14 +127,14 @@ public class JobSubmissionFailsITCase extends TestLogger {
 		runJobSubmissionTest(jobGraph, e -> ExceptionUtils.findThrowable(e, IOException.class).isPresent());
 	}
 
-	private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws org.apache.flink.client.program.ProgramInvocationException {
+	private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws Exception {
 		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
 		try {
 			if (detached) {
-				ClientUtils.submitJob(client, jobGraph);
+				client.submitJob(jobGraph).get();
 			} else {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			}
 			fail("Job submission should have thrown an exception.");
 		} catch (Exception e) {
@@ -143,7 +143,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
 			}
 		}
 
-		ClientUtils.submitJobAndWaitForResult(client, getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
+		submitJobAndWaitForResult(client, getWorkingJobGraph(), getClass().getClassLoader());
 	}
 
 	@Nonnull
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 6a148f0..479ae57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -34,10 +32,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -266,22 +266,22 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			final boolean isSlowSender,
 			final boolean isSlowReceiver,
 			final int parallelism) throws Exception {
-		ClusterClient<?> client = cluster.getClusterClient();
-
-		JobExecutionResult jer = ClientUtils.submitJobAndWaitForResult(
-			client,
-			createJobGraph(
-				dataVolumeGb,
-				useForwarder,
-				isSlowSender,
-				isSlowReceiver,
-				parallelism),
-			getClass().getClassLoader());
-
-		long dataVolumeMbit = dataVolumeGb * 8192;
-		long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
-
-		int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+		final ClusterClient<?> client = cluster.getClusterClient();
+		final JobGraph jobGraph = createJobGraph(
+			dataVolumeGb,
+			useForwarder,
+			isSlowSender,
+			isSlowReceiver,
+			parallelism);
+		final JobResult jobResult = client.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get();
+
+		Assert.assertFalse(jobResult.getSerializedThrowable().isPresent());
+
+		final long dataVolumeMbit = dataVolumeGb * 8192;
+		final long runtimeSecs = TimeUnit.SECONDS.convert(jobResult.getNetRuntime(), TimeUnit.MILLISECONDS);
+		final int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
 
 		LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
 			"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index a9321fa..43bc0dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -120,7 +119,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 		assertNotNull(jobToMigrate.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobToMigrate);
+		clusterClient.submitJob(jobToMigrate).get();
 
 		CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccessfulWithDelay(
 			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
@@ -172,7 +171,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 		assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobToRestore);
+		clusterClient.submitJob(jobToRestore).get();
 
 		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
 			() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index b3842c0..443f968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,7 +18,6 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -92,7 +92,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 			StandaloneClusterId.getInstance());
 
 		try {
-			ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
+			submitJobAndWaitForResult(restClusterClient, jobGraph, getClass().getClassLoader());
 
 			List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 1606783..95550e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,8 +19,10 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import static org.junit.Assert.fail;
@@ -52,4 +54,11 @@ public class TestUtils {
 
 		return null;
 	}
+
+	public static void submitJobAndWaitForResult(ClusterClient<?> client, JobGraph jobGraph, ClassLoader classLoader) throws Exception {
+		client.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.toJobExecutionResult(classLoader);
+	}
 }