You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2020/08/15 01:40:52 UTC

[flink] branch FLINK-15299 created (now 47e6ef8)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch FLINK-15299
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at 47e6ef8  [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope

This branch includes the following new commits:

     new 47e6ef8  [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/01: [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch FLINK-15299
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47e6ef886af7cb5e2d43dfb6ce3e4f533457172d
Author: tison <wa...@gmail.com>
AuthorDate: Sat Aug 15 08:29:49 2020 +0800

    [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
---
 .../java/org/apache/flink/client/ClientUtils.java  |  17 ---
 .../apache/flink/client/program/ClientTest.java    |   6 +-
 .../client/program/rest/RestClusterClientTest.java | 170 +--------------------
 .../connectors/kafka/KafkaConsumerTestBase.java    |  12 +-
 .../flink/state/api/SavepointReaderITTestBase.java |   6 +-
 .../flink/state/api/SavepointWriterITCase.java     |  22 ++-
 .../flink/state/api/utils/SavepointTestBase.java   |   6 +-
 .../jobmanager/JMXJobManagerMetricTest.java        |   3 +-
 .../itcases/AbstractQueryableStateTestBase.java    |  44 +++---
 .../runtime/webmonitor/WebFrontendITCase.java      |   5 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   3 +-
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   3 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |   5 +-
 .../flink/test/cancelling/CancelingTestBase.java   |  17 +--
 .../NotifyCheckpointAbortedITCase.java             |   3 +-
 .../test/checkpointing/RegionFailoverITCase.java   |   4 +-
 .../flink/test/checkpointing/RescalingITCase.java  |  18 +--
 .../ResumeCheckpointManuallyITCase.java            |   3 +-
 .../flink/test/checkpointing/SavepointITCase.java  |  20 ++-
 .../StreamFaultToleranceTestBase.java              |  23 +--
 .../ZooKeeperHighAvailabilityITCase.java           |   3 +-
 .../utils/SavepointMigrationTestBase.java          |  17 +--
 .../test/example/client/JobRetrievalITCase.java    |   3 +-
 .../example/failing/JobSubmissionFailsITCase.java  |  10 +-
 .../test/runtime/NetworkStackThroughputITCase.java |  36 ++---
 .../restore/AbstractOperatorRestoreTestBase.java   |   5 +-
 .../runtime/BigUserProgramJobSubmitITCase.java     |   4 +-
 .../java/org/apache/flink/test/util/TestUtils.java |   9 ++
 28 files changed, 139 insertions(+), 338 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index b651438..8275302 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.client.program.StreamContextEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
@@ -74,22 +73,6 @@ public enum ClientUtils {
 		return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns, NOOP_EXCEPTION_HANDLER);
 	}
 
-	public static JobExecutionResult submitJob(
-			ClusterClient<?> client,
-			JobGraph jobGraph) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
-		try {
-			return client
-				.submitJob(jobGraph)
-				.thenApply(DetachedJobExecutionResult::new)
-				.get();
-		} catch (InterruptedException | ExecutionException e) {
-			ExceptionUtils.checkInterrupted(e);
-			throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
-		}
-	}
-
 	public static JobExecutionResult submitJobAndWaitForResult(
 			ClusterClient<?> client,
 			JobGraph jobGraph,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 508e3a6..22b78c8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -227,8 +226,7 @@ public class ClientTest extends TestLogger {
 		jobGraph.addJars(Collections.emptyList());
 		jobGraph.setClasspaths(Collections.emptyList());
 
-		JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
-		assertNotNull(result);
+		assertNotNull(clusterClient.submitJob(jobGraph).get());
 	}
 
 	/**
@@ -430,7 +428,7 @@ public class ClientTest extends TestLogger {
 						jobGraph.addJars(accessor.getJars());
 						jobGraph.setClasspaths(accessor.getClasspaths());
 
-						final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+						final JobID jobID = clusterClient.submitJob(jobGraph).get();
 						return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
 					};
 				}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index a55b9e7..2ae278e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -18,29 +18,22 @@
 
 package org.apache.flink.client.program.rest;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -66,7 +59,6 @@ import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
 import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -74,8 +66,6 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
-import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
@@ -122,7 +112,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -130,10 +119,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -227,22 +214,12 @@ public class RestClusterClientTest extends TestLogger {
 	public void testJobSubmitCancel() throws Exception {
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler();
-		TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.SUCCEEDED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.build()));
 
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			submitHandler,
-			terminationHandler,
-			testJobExecutionResultHandler)) {
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(submitHandler, terminationHandler)) {
 
 			try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 				Assert.assertFalse(submitHandler.jobSubmitted);
-				ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
+				restClusterClient.submitJob(jobGraph).get();
 				Assert.assertTrue(submitHandler.jobSubmitted);
 
 				Assert.assertFalse(terminationHandler.jobCanceled);
@@ -252,32 +229,6 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that we can submit a jobGraph in detached mode.
-	 */
-	@Test
-	public void testDetachedJobSubmission() throws Exception {
-
-		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
-
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			testJobSubmitHandler)) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
-			try {
-				final JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(restClusterClient, jobGraph);
-
-				// if the detached mode didn't work, then we would not reach this point because the execution result
-				// retrieval would have failed.
-				assertThat(jobSubmissionResult, is(instanceOf(DetachedJobExecutionResult.class)));
-				assertThat(jobSubmissionResult.getJobID(), is(jobId));
-			} finally {
-				restClusterClient.close();
-			}
-		}
-
-	}
-
 	private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 		private volatile boolean jobSubmitted = false;
 
@@ -306,94 +257,6 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	private class TestJobExecutionResultHandler
-		extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
-
-		private final Iterator<Object> jobExecutionResults;
-
-		private Object lastJobExecutionResult;
-
-		private TestJobExecutionResultHandler(
-				final Object... jobExecutionResults) {
-			super(JobExecutionResultHeaders.getInstance());
-			checkArgument(Arrays.stream(jobExecutionResults)
-				.allMatch(object -> object instanceof JobExecutionResultResponseBody
-					|| object instanceof RestHandlerException));
-			this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
-		}
-
-		@Override
-		protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
-				@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
-				@Nonnull DispatcherGateway gateway) throws RestHandlerException {
-			if (jobExecutionResults.hasNext()) {
-				lastJobExecutionResult = jobExecutionResults.next();
-			}
-			checkState(lastJobExecutionResult != null);
-			if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
-				return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
-			} else if (lastJobExecutionResult instanceof RestHandlerException) {
-				return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
-			} else {
-				throw new AssertionError();
-			}
-		}
-	}
-
-	@Test
-	public void testSubmitJobAndWaitForExecutionResult() throws Exception {
-		final TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(
-				new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
-				JobExecutionResultResponseBody.inProgress(),
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.SUCCEEDED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
-					.build()),
-				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.applicationStatus(ApplicationStatus.FAILED)
-					.jobId(jobId)
-					.netRuntime(Long.MAX_VALUE)
-					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
-					.build()));
-
-		// fail first HTTP polling attempt, which should not be a problem because of the retries
-		final AtomicBoolean firstPollFailed = new AtomicBoolean();
-		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
-			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
-
-		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
-			testJobExecutionResultHandler,
-			new TestJobSubmitHandler())) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
-			try {
-				JobExecutionResult jobExecutionResult;
-
-				jobExecutionResult = ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-				assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-				assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
-				assertThat(
-					jobExecutionResult.getAllAccumulatorResults(),
-					equalTo(Collections.singletonMap("testName", 1.0)));
-
-				try {
-					ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-					fail("Expected exception not thrown.");
-				} catch (final ProgramInvocationException e) {
-					final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
-
-					assertThat(cause.isPresent(), is(true));
-					assertThat(cause.get().getMessage(), equalTo("expected"));
-				}
-			} finally {
-				restClusterClient.close();
-			}
-		}
-	}
-
 	@Test
 	public void testDisposeSavepoint() throws Exception {
 		final String savepointPath = "foobar";
@@ -595,35 +458,6 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testJobSubmissionFailureThrowsProgramInvocationException() throws Exception {
-		try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler())) {
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
-			try {
-				ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
-			} catch (final ProgramInvocationException expected) {
-				// expected
-			} finally {
-				restClusterClient.close();
-			}
-		}
-	}
-
-	private final class SubmissionFailingHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
-
-		private SubmissionFailingHandler() {
-			super(JobSubmitHeaders.getInstance());
-		}
-
-		@Override
-		protected CompletableFuture<JobSubmitResponseBody> handleRequest(
-				@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
-				@Nonnull DispatcherGateway gateway) throws RestHandlerException {
-			throw new RestHandlerException("expected", HttpResponseStatus.INTERNAL_SERVER_ERROR);
-		}
-	}
-
 	/**
 	 * Tests that the send operation is not being retried when receiving a NOT_FOUND return code.
 	 */
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8406f70..7f0f0ed 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -36,7 +36,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -115,6 +114,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -455,7 +455,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final AtomicReference<Throwable> error = new AtomicReference<>();
 		Thread consumeThread = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 					error.set(t);
@@ -1001,7 +1001,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		final Runnable jobRunner = () -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				jobError.set(t);
 			}
@@ -1071,7 +1071,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		final Runnable jobRunner = () -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				LOG.error("Job Runner failed with exception", t);
 				error.set(t);
@@ -1657,7 +1657,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		Thread jobThread = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
 					LOG.warn("Got exception during execution", t);
@@ -2134,7 +2134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
 		Thread runner = new Thread(() -> {
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 				tryExecute(readEnv, "sequence validation");
 			} catch (Throwable t) {
 				if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
index f7e3a87..96414ba 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.state.api;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -172,7 +170,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
 		String dirPath = getTempDirPath(new AbstractID().toHexString());
 
 		try {
-			JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+			JobID jobID = client.submitJob(jobGraph).get();
 
 			boolean finished = false;
 			while (deadline.hasTimeLeft()) {
@@ -193,7 +191,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
 				Assert.fail("Failed to initialize state within deadline");
 			}
 
-			CompletableFuture<String> path = client.triggerSavepoint(result.getJobID(), dirPath);
+			CompletableFuture<String> path = client.triggerSavepoint(jobID, dirPath);
 			return path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		} finally {
 			client.cancel(jobId).get();
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index d87a33d..5dd3fcc 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -28,9 +28,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -51,6 +49,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,6 +62,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
@@ -147,7 +147,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Bootstrap");
 	}
 
-	private void validateBootstrap(String savepointPath) throws ProgramInvocationException {
+	private void validateBootstrap(String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);
 
@@ -170,7 +170,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
 
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
-		ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+		Optional<SerializedThrowable> serializedThrowable = client
+			.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.getSerializedThrowable();
+		Assert.assertFalse(serializedThrowable.isPresent());
 
 		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
 	}
@@ -193,7 +198,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Modifying");
 	}
 
-	private void validateModification(String savepointPath) throws ProgramInvocationException {
+	private void validateModification(String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);
 
@@ -216,7 +221,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
 
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
-		ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+		Optional<SerializedThrowable> serializedThrowable = client
+			.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.getSerializedThrowable();
+		Assert.assertFalse(serializedThrowable.isPresent());
 
 		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
index 0d25682..8e75058 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.state.api.utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,11 +54,11 @@ public abstract class SavepointTestBase extends AbstractTestBase {
 		ClusterClient<?> client = miniClusterResource.getClusterClient();
 
 		try {
-			JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+			JobID jobID = client.submitJob(jobGraph).get();
 
 			return CompletableFuture
 				.runAsync(waitingSource::awaitSource)
-				.thenCompose(ignore -> triggerSavepoint(client, result.getJobID()))
+				.thenCompose(ignore -> triggerSavepoint(client, jobID))
 				.get(5, TimeUnit.MINUTES);
 		} catch (Exception e) {
 			throw new RuntimeException("Failed to take savepoint", e);
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index dacfd5a..cbd0a7a 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -108,7 +107,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
 				null));
 
 			ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			FutureUtils.retrySuccessfulWithDelay(
 				() -> client.getJobStatus(jobGraph.getJobID()),
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index d9e3c91..123d3d8 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -45,9 +45,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
@@ -57,6 +55,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -187,7 +186,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final AtomicLongArray counts = new AtomicLongArray(numKeys);
 
@@ -296,16 +295,15 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		// Submit the job graph
 		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		boolean caughtException = false;
-		try {
-			ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, AbstractQueryableStateTestBase.class.getClassLoader());
-		} catch (ProgramInvocationException e) {
-			String failureCause = ExceptionUtils.stringifyException(e);
-			assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
-			caughtException = true;
-		}
-
-		assertTrue(caughtException);
+		clusterClient.submitJob(jobGraph)
+			.thenCompose(clusterClient::requestJobResult)
+			.thenApply(JobResult::getSerializedThrowable)
+			.thenAccept(serializedThrowable -> {
+				assertTrue(serializedThrowable.isPresent());
+				final Throwable t = serializedThrowable.get().deserializeError(getClass().getClassLoader());
+				final String failureCause = ExceptionUtils.stringifyException(t);
+				assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
+			}).get();
 	}
 
 	/**
@@ -346,7 +344,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		}
 	}
@@ -380,7 +378,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(deadline, clusterClient, env)) {
 
-			ClientUtils.submitJob(clusterClient, closableJobGraph.getJobGraph());
+			clusterClient.submitJob(closableJobGraph.getJobGraph()).get();
 
 			CompletableFuture<JobStatus> jobStatusFuture =
 				clusterClient.getJobStatus(closableJobGraph.getJobId());
@@ -481,7 +479,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState);
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
 		}
 	}
@@ -528,7 +526,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			// Now query
 			int key = 0;
@@ -596,7 +594,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
 		}
 	}
@@ -640,7 +638,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
@@ -712,7 +710,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final long expected = numElements * (numElements + 1L) / 2L;
 
@@ -804,7 +802,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final long expected = numElements * (numElements + 1L) / 2L;
 
@@ -894,7 +892,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			final Map<Integer, Set<Long>> results = new HashMap<>();
 
@@ -979,7 +977,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			final JobID jobId = autoCancellableJob.getJobId();
 			final JobGraph jobGraph = autoCancellableJob.getJobGraph();
 
-			ClientUtils.submitJob(clusterClient, jobGraph);
+			clusterClient.submitJob(jobGraph).get();
 
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index be078a0..5522375 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -260,7 +259,7 @@ public class WebFrontendITCase extends TestLogger {
 		final JobID jid = jobGraph.getJobID();
 
 		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait for job to show up
 		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
@@ -318,7 +317,7 @@ public class WebFrontendITCase extends TestLogger {
 		final JobID jid = jobGraph.getJobID();
 
 		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait for job to show up
 		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 42c889c..e8d223e 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -289,7 +288,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 						0),
 				null));
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 8605623..b861956 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -115,7 +114,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
 				0),
 			null));
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 		assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
 		waitForJob();
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 4e94e43..bcf2702 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -60,6 +59,8 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+
 /**
  * Tests the availability of accumulator results during runtime.
  */
@@ -145,7 +146,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 		final CheckedThread submissionThread = new CheckedThread() {
 			@Override
 			public void go() throws Exception {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			}
 		};
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03bc6eb..2551b9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.test.cancelling;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -97,14 +96,14 @@ public abstract class CancelingTestBase extends TestLogger {
 		final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
 
 		ClusterClient<?> client = CLUSTER.getClusterClient();
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
 		Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 
-		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+		JobStatus jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+			jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatus != JobStatus.RUNNING) {
 			Assert.fail("Job not in state RUNNING.");
@@ -112,17 +111,17 @@ public abstract class CancelingTestBase extends TestLogger {
 
 		Thread.sleep(msecsTillCanceling);
 
-		client.cancel(jobSubmissionResult.getJobID()).get();
+		client.cancel(jobID).get();
 
 		Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+		JobStatus jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
 			Thread.sleep(50);
-			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+			jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
 		}
 		if (jobStatusAfterCancel != JobStatus.CANCELED) {
-			Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
+			Assert.fail("Failed to cancel job with ID " + jobID + '.');
 		}
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index e7e6e5c..9a6428f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -174,7 +173,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger {
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		JobID jobID = jobGraph.getJobID();
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		TestingCompletedCheckpointStore.addCheckpointLatch.await();
 		TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index d180675..3459e54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -77,6 +76,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -143,7 +143,7 @@ public class RegionFailoverITCase extends TestLogger {
 		try {
 			JobGraph jobGraph = createJobGraph();
 			ClusterClient<?> client = cluster.getClusterClient();
-			ClientUtils.submitJobAndWaitForResult(client, jobGraph, RegionFailoverITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			verifyAfterJobExecuted();
 		} catch (Exception e) {
 			e.printStackTrace();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index b011950..be58fd7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -79,6 +78,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -188,7 +188,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -226,7 +226,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -267,7 +267,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
@@ -287,7 +287,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 		} catch (JobExecutionException exception) {
 			if (exception.getCause() instanceof IllegalStateException) {
 				// we expect a IllegalStateException wrapped
@@ -332,7 +332,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -375,7 +375,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -454,7 +454,7 @@ public class RescalingITCase extends TestLogger {
 
 			final JobID jobID = jobGraph.getJobID();
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
@@ -479,7 +479,7 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+			submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
 
 			int sumExp = 0;
 			int sumAct = 0;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index e39fce7..d9f4bf30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -295,7 +294,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 		JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
 		NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
 
-		ClientUtils.submitJob(client, initialJobGraph);
+		client.submitJob(initialJobGraph).get();
 
 		// wait until all sources have been started
 		NotifyingInfiniteTupleSource.countDownLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index abdc188..d70a409 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -91,6 +89,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -209,7 +208,7 @@ public class SavepointITCase extends TestLogger {
 		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			StatefulCounter.getProgressLatch().await();
 
@@ -253,7 +252,7 @@ public class SavepointITCase extends TestLogger {
 		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 
 			// Await state is restored
 			StatefulCounter.getRestoreLatch().await();
@@ -337,7 +336,7 @@ public class SavepointITCase extends TestLogger {
 		final JobGraph graph = new JobGraph(vertex);
 
 		try {
-			ClientUtils.submitJob(client, graph);
+			client.submitJob(graph).get();
 
 			client.triggerSavepoint(graph.getJobID(), null).get();
 
@@ -386,7 +385,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			try {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			} catch (Exception e) {
 				Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
 				Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
@@ -452,8 +451,7 @@ public class SavepointITCase extends TestLogger {
 
 			JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
 
-			JobSubmissionResult submissionResult = ClientUtils.submitJob(client, originalJobGraph);
-			JobID jobID = submissionResult.getJobID();
+			JobID jobID = client.submitJob(originalJobGraph).get();
 
 			// wait for the Tasks to be ready
 			assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -502,7 +500,7 @@ public class SavepointITCase extends TestLogger {
 					"savepoint path " + savepointPath + " in detached mode.");
 
 			// Submit the job
-			ClientUtils.submitJob(client, modifiedJobGraph);
+			client.submitJob(modifiedJobGraph).get();
 			// Await state is restored
 			assertTrue(StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
@@ -696,7 +694,7 @@ public class SavepointITCase extends TestLogger {
 
 		String savepointPath = null;
 		try {
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 			for (OneShotLatch latch : iterTestSnapshotWait) {
 				latch.await();
 			}
@@ -710,7 +708,7 @@ public class SavepointITCase extends TestLogger {
 			jobGraph = streamGraph.getJobGraph();
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			ClientUtils.submitJob(client, jobGraph);
+			client.submitJob(jobGraph).get();
 			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index f6c0efb..4f500fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,8 +19,6 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +26,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -41,7 +40,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 
-import static org.junit.Assert.fail;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 
 /**
  * Test base for fault tolerant streaming programs.
@@ -126,21 +125,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 			try {
-				ClientUtils.submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader()).getJobExecutionResult();
-			} catch (ProgramInvocationException root) {
-				Throwable cause = root.getCause();
-
-				// search for nested SuccessExceptions
-				int depth = 0;
-				while (!(cause instanceof SuccessException)) {
-					if (cause == null || depth++ == 20) {
-						root.printStackTrace();
-						fail("Test failed: " + root.getMessage());
-					}
-					else {
-						cause = cause.getCause();
-					}
-				}
+				submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
+			} catch (Exception e) {
+				Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
 			}
 
 			postSubmit();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 81df28f..4bd4754 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -209,7 +208,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobGraph);
+		clusterClient.submitJob(jobGraph).get();
 
 		// wait until we did some checkpoints
 		waitForCheckpointLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1e7f619..7787a18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.test.checkpointing.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -130,14 +128,14 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 		// Submit the job
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
-		LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+		LOG.info("Submitted job {} and waiting...", jobID);
 
 		boolean done = false;
 		while (deadLine.hasTimeLeft()) {
 			Thread.sleep(100);
-			Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobID).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
@@ -165,7 +163,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 		LOG.info("Triggering savepoint.");
 
-		CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
+		CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
 
 		String jobmanagerSavepointPath = savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
@@ -193,17 +191,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-		JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+		JobID jobID = client.submitJob(jobGraph).get();
 
 		boolean done = false;
 		while (deadLine.hasTimeLeft()) {
 
 			// try and get a job result, this will fail if the job already failed. Use this
 			// to get out of this loop
-			JobID jobId = jobSubmissionResult.getJobID();
 
 			try {
-				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
+				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobID);
 
 				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
 
@@ -213,7 +210,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 			}
 
 			Thread.sleep(100);
-			Map<String, Object> accumulators = client.getAccumulators(jobId).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobID).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 063a9cd..e0611a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -20,7 +20,6 @@
 package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
@@ -95,7 +94,7 @@ public class JobRetrievalITCase extends TestLogger {
 		// has been attached in resumingThread
 		lock.acquire();
 
-		ClientUtils.submitJob(client, jobGraph);
+		client.submitJob(jobGraph).get();
 
 		final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
 			@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index f19080f..d5f220b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.test.example.failing;
 
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
@@ -45,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.function.Predicate;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.fail;
 
 /**
@@ -127,14 +127,14 @@ public class JobSubmissionFailsITCase extends TestLogger {
 		runJobSubmissionTest(jobGraph, e -> ExceptionUtils.findThrowable(e, IOException.class).isPresent());
 	}
 
-	private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws org.apache.flink.client.program.ProgramInvocationException {
+	private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws Exception {
 		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
 		try {
 			if (detached) {
-				ClientUtils.submitJob(client, jobGraph);
+				client.submitJob(jobGraph).get();
 			} else {
-				ClientUtils.submitJobAndWaitForResult(client, jobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			}
 			fail("Job submission should have thrown an exception.");
 		} catch (Exception e) {
@@ -143,7 +143,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
 			}
 		}
 
-		ClientUtils.submitJobAndWaitForResult(client, getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
+		submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 	}
 
 	@Nonnull
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 6a148f0..479ae57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -34,10 +32,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -266,22 +266,22 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			final boolean isSlowSender,
 			final boolean isSlowReceiver,
 			final int parallelism) throws Exception {
-		ClusterClient<?> client = cluster.getClusterClient();
-
-		JobExecutionResult jer = ClientUtils.submitJobAndWaitForResult(
-			client,
-			createJobGraph(
-				dataVolumeGb,
-				useForwarder,
-				isSlowSender,
-				isSlowReceiver,
-				parallelism),
-			getClass().getClassLoader());
-
-		long dataVolumeMbit = dataVolumeGb * 8192;
-		long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
-
-		int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+		final ClusterClient<?> client = cluster.getClusterClient();
+		final JobGraph jobGraph = createJobGraph(
+			dataVolumeGb,
+			useForwarder,
+			isSlowSender,
+			isSlowReceiver,
+			parallelism);
+		final JobResult jobResult = client.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get();
+
+		Assert.assertFalse(jobResult.getSerializedThrowable().isPresent());
+
+		final long dataVolumeMbit = dataVolumeGb * 8192;
+		final long runtimeSecs = TimeUnit.SECONDS.convert(jobResult.getNetRuntime(), TimeUnit.MILLISECONDS);
+		final int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
 
 		LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
 			"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index a9321fa..43bc0dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -120,7 +119,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 		assertNotNull(jobToMigrate.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobToMigrate);
+		clusterClient.submitJob(jobToMigrate).get();
 
 		CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccessfulWithDelay(
 			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
@@ -172,7 +171,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 		assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
 
-		ClientUtils.submitJob(clusterClient, jobToRestore);
+		clusterClient.submitJob(jobToRestore).get();
 
 		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
 			() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index b3842c0..443f968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,7 +18,6 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -92,7 +92,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 			StandaloneClusterId.getInstance());
 
 		try {
-			ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
+			submitJobAndWaitForResult(restClusterClient, jobGraph, getClass().getClassLoader());
 
 			List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 1606783..95550e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,8 +19,10 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import static org.junit.Assert.fail;
@@ -52,4 +54,11 @@ public class TestUtils {
 
 		return null;
 	}
+
+	public static void submitJobAndWaitForResult(ClusterClient<?> client, JobGraph jobGraph, ClassLoader classLoader) throws Exception {
+		client.submitJob(jobGraph)
+			.thenCompose(client::requestJobResult)
+			.get()
+			.toJobExecutionResult(classLoader);
+	}
 }