You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2020/08/19 23:32:43 UTC
[flink] branch master updated: [FLINK-15299][test] Move
ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dfb8a3b [FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
dfb8a3b is described below
commit dfb8a3be7f0d113032a28cf6a1b296725e5562f5
Author: tison <wa...@gmail.com>
AuthorDate: Sat Aug 15 08:29:49 2020 +0800
[FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to test scope
This closes #11469 .
---
.../java/org/apache/flink/client/ClientUtils.java | 52 -----
.../apache/flink/client/program/ClientTest.java | 6 +-
.../client/program/rest/RestClusterClientTest.java | 243 +++++++++------------
.../connectors/kafka/KafkaConsumerTestBase.java | 12 +-
.../flink/state/api/SavepointReaderITTestBase.java | 6 +-
.../flink/state/api/SavepointWriterITCase.java | 22 +-
.../flink/state/api/utils/SavepointTestBase.java | 6 +-
.../jobmanager/JMXJobManagerMetricTest.java | 3 +-
.../itcases/AbstractQueryableStateTestBase.java | 44 ++--
.../runtime/webmonitor/WebFrontendITCase.java | 5 +-
.../jobmaster/JobMasterStopWithSavepointIT.java | 3 +-
.../jobmaster/JobMasterTriggerSavepointITCase.java | 3 +-
.../test/accumulators/AccumulatorLiveITCase.java | 5 +-
.../flink/test/cancelling/CancelingTestBase.java | 17 +-
.../NotifyCheckpointAbortedITCase.java | 3 +-
.../test/checkpointing/RegionFailoverITCase.java | 4 +-
.../flink/test/checkpointing/RescalingITCase.java | 18 +-
.../ResumeCheckpointManuallyITCase.java | 3 +-
.../flink/test/checkpointing/SavepointITCase.java | 20 +-
.../StreamFaultToleranceTestBase.java | 23 +-
.../ZooKeeperHighAvailabilityITCase.java | 3 +-
.../utils/SavepointMigrationTestBase.java | 17 +-
.../test/example/client/JobRetrievalITCase.java | 3 +-
.../example/failing/JobSubmissionFailsITCase.java | 10 +-
.../test/runtime/NetworkStackThroughputITCase.java | 36 +--
.../restore/AbstractOperatorRestoreTestBase.java | 5 +-
.../runtime/BigUserProgramJobSubmitITCase.java | 4 +-
.../java/org/apache/flink/test/util/TestUtils.java | 9 +
28 files changed, 238 insertions(+), 347 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index c94dbc2..d7b069f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,8 +18,6 @@
package org.apache.flink.client;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -27,22 +25,15 @@ import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -82,49 +73,6 @@ public enum ClientUtils {
checkClassloaderLeak);
}
- public static JobExecutionResult submitJob(
- ClusterClient<?> client,
- JobGraph jobGraph) throws ProgramInvocationException {
- checkNotNull(client);
- checkNotNull(jobGraph);
- try {
- return client
- .submitJob(jobGraph)
- .thenApply(DetachedJobExecutionResult::new)
- .get();
- } catch (InterruptedException | ExecutionException e) {
- ExceptionUtils.checkInterrupted(e);
- throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
- }
- }
-
- public static JobExecutionResult submitJobAndWaitForResult(
- ClusterClient<?> client,
- JobGraph jobGraph,
- ClassLoader classLoader) throws ProgramInvocationException {
- checkNotNull(client);
- checkNotNull(jobGraph);
- checkNotNull(classLoader);
-
- JobResult jobResult;
-
- try {
- jobResult = client
- .submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get();
- } catch (InterruptedException | ExecutionException e) {
- ExceptionUtils.checkInterrupted(e);
- throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
- }
-
- try {
- return jobResult.toJobExecutionResult(classLoader);
- } catch (JobExecutionException | IOException | ClassNotFoundException e) {
- throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
- }
- }
-
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 508e3a6..22b78c8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
@@ -227,8 +226,7 @@ public class ClientTest extends TestLogger {
jobGraph.addJars(Collections.emptyList());
jobGraph.setClasspaths(Collections.emptyList());
- JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
- assertNotNull(result);
+ assertNotNull(clusterClient.submitJob(jobGraph).get());
}
/**
@@ -430,7 +428,7 @@ public class ClientTest extends TestLogger {
jobGraph.addJars(accessor.getJars());
jobGraph.setClasspaths(accessor.getClasspaths());
- final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+ final JobID jobID = clusterClient.submitJob(jobGraph).get();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
};
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 81e845e..fbd8b2c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -21,19 +21,15 @@ package org.apache.flink.client.program.rest;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -134,7 +130,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -228,22 +223,12 @@ public class RestClusterClientTest extends TestLogger {
public void testJobSubmitCancel() throws Exception {
TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler();
- TestJobExecutionResultHandler testJobExecutionResultHandler =
- new TestJobExecutionResultHandler(
- JobExecutionResultResponseBody.created(new JobResult.Builder()
- .applicationStatus(ApplicationStatus.SUCCEEDED)
- .jobId(jobId)
- .netRuntime(Long.MAX_VALUE)
- .build()));
- try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
- submitHandler,
- terminationHandler,
- testJobExecutionResultHandler)) {
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(submitHandler, terminationHandler)) {
try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
Assert.assertFalse(submitHandler.jobSubmitted);
- ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
+ restClusterClient.submitJob(jobGraph).get();
Assert.assertTrue(submitHandler.jobSubmitted);
Assert.assertFalse(terminationHandler.jobCanceled);
@@ -253,32 +238,6 @@ public class RestClusterClientTest extends TestLogger {
}
}
- /**
- * Tests that we can submit a jobGraph in detached mode.
- */
- @Test
- public void testDetachedJobSubmission() throws Exception {
-
- final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
-
- try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
- testJobSubmitHandler)) {
- RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
- try {
- final JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(restClusterClient, jobGraph);
-
- // if the detached mode didn't work, then we would not reach this point because the execution result
- // retrieval would have failed.
- assertThat(jobSubmissionResult, is(instanceOf(DetachedJobExecutionResult.class)));
- assertThat(jobSubmissionResult.getJobID(), is(jobId));
- } finally {
- restClusterClient.close();
- }
- }
-
- }
-
private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private volatile boolean jobSubmitted = false;
@@ -307,94 +266,6 @@ public class RestClusterClientTest extends TestLogger {
}
}
- private class TestJobExecutionResultHandler
- extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
-
- private final Iterator<Object> jobExecutionResults;
-
- private Object lastJobExecutionResult;
-
- private TestJobExecutionResultHandler(
- final Object... jobExecutionResults) {
- super(JobExecutionResultHeaders.getInstance());
- checkArgument(Arrays.stream(jobExecutionResults)
- .allMatch(object -> object instanceof JobExecutionResultResponseBody
- || object instanceof RestHandlerException));
- this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
- }
-
- @Override
- protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
- @Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
- @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- if (jobExecutionResults.hasNext()) {
- lastJobExecutionResult = jobExecutionResults.next();
- }
- checkState(lastJobExecutionResult != null);
- if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
- return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
- } else if (lastJobExecutionResult instanceof RestHandlerException) {
- return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
- } else {
- throw new AssertionError();
- }
- }
- }
-
- @Test
- public void testSubmitJobAndWaitForExecutionResult() throws Exception {
- final TestJobExecutionResultHandler testJobExecutionResultHandler =
- new TestJobExecutionResultHandler(
- new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
- JobExecutionResultResponseBody.inProgress(),
- JobExecutionResultResponseBody.created(new JobResult.Builder()
- .applicationStatus(ApplicationStatus.SUCCEEDED)
- .jobId(jobId)
- .netRuntime(Long.MAX_VALUE)
- .accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
- .build()),
- JobExecutionResultResponseBody.created(new JobResult.Builder()
- .applicationStatus(ApplicationStatus.FAILED)
- .jobId(jobId)
- .netRuntime(Long.MAX_VALUE)
- .serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
- .build()));
-
- // fail first HTTP polling attempt, which should not be a problem because of the retries
- final AtomicBoolean firstPollFailed = new AtomicBoolean();
- failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
- messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
-
- try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
- testJobExecutionResultHandler,
- new TestJobSubmitHandler())) {
- RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
- try {
- JobExecutionResult jobExecutionResult;
-
- jobExecutionResult = ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
- assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
- assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
- assertThat(
- jobExecutionResult.getAllAccumulatorResults(),
- equalTo(Collections.singletonMap("testName", 1.0)));
-
- try {
- ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
- fail("Expected exception not thrown.");
- } catch (final ProgramInvocationException e) {
- final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
-
- assertThat(cause.isPresent(), is(true));
- assertThat(cause.get().getMessage(), equalTo("expected"));
- }
- } finally {
- restClusterClient.close();
- }
- }
- }
-
@Test
public void testDisposeSavepoint() throws Exception {
final String savepointPath = "foobar";
@@ -596,21 +467,109 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ private class TestJobExecutionResultHandler extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
+
+ private final Iterator<Object> jobExecutionResults;
+
+ private Object lastJobExecutionResult;
+
+ private TestJobExecutionResultHandler(
+ final Object... jobExecutionResults) {
+ super(JobExecutionResultHeaders.getInstance());
+ checkArgument(Arrays.stream(jobExecutionResults)
+ .allMatch(object -> object instanceof JobExecutionResultResponseBody
+ || object instanceof RestHandlerException));
+ this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
+ }
+
+ @Override
+ protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+ @Nonnull DispatcherGateway gateway) {
+ if (jobExecutionResults.hasNext()) {
+ lastJobExecutionResult = jobExecutionResults.next();
+ }
+ checkState(lastJobExecutionResult != null);
+ if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
+ return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
+ } else if (lastJobExecutionResult instanceof RestHandlerException) {
+ return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
+ } else {
+ throw new AssertionError();
+ }
+ }
+ }
+
@Test
- public void testJobSubmissionFailureThrowsProgramInvocationException() throws Exception {
- try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler())) {
- RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
+ final TestJobExecutionResultHandler testJobExecutionResultHandler =
+ new TestJobExecutionResultHandler(
+ new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
+ JobExecutionResultResponseBody.inProgress(),
+ JobExecutionResultResponseBody.created(new JobResult.Builder()
+ .applicationStatus(ApplicationStatus.SUCCEEDED)
+ .jobId(jobId)
+ .netRuntime(Long.MAX_VALUE)
+ .accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
+ .build()),
+ JobExecutionResultResponseBody.created(new JobResult.Builder()
+ .applicationStatus(ApplicationStatus.FAILED)
+ .jobId(jobId)
+ .netRuntime(Long.MAX_VALUE)
+ .serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
+ .build()));
- try {
- ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, ClassLoader.getSystemClassLoader());
- } catch (final ProgramInvocationException expected) {
- // expected
- } finally {
- restClusterClient.close();
+ // fail first HTTP polling attempt, which should not be a problem because of the retries
+ final AtomicBoolean firstPollFailed = new AtomicBoolean();
+ failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+ messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
+
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
+ testJobExecutionResultHandler,
+ new TestJobSubmitHandler())) {
+
+ try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+ final JobExecutionResult jobExecutionResult = restClusterClient.submitJob(jobGraph)
+ .thenCompose(restClusterClient::requestJobResult)
+ .get()
+ .toJobExecutionResult(ClassLoader.getSystemClassLoader());
+ assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
+ assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
+ assertThat(
+ jobExecutionResult.getAllAccumulatorResults(),
+ equalTo(Collections.singletonMap("testName", 1.0)));
+
+ try {
+ restClusterClient.submitJob(jobGraph)
+ .thenCompose(restClusterClient::requestJobResult)
+ .get()
+ .toJobExecutionResult(ClassLoader.getSystemClassLoader());
+ fail("Expected exception not thrown.");
+ } catch (final Exception e) {
+ final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(), equalTo("expected"));
+ }
}
}
}
+ @Test
+ public void testJobSubmissionFailureCauseForwardedToClient() throws Exception {
+ try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler())) {
+ try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+ restClusterClient.submitJob(jobGraph)
+ .thenCompose(restClusterClient::requestJobResult)
+ .get()
+ .toJobExecutionResult(ClassLoader.getSystemClassLoader());
+ } catch (final Exception e) {
+ assertTrue(ExceptionUtils.findThrowableWithMessage(e, "RestHandlerException: expected").isPresent());
+ return;
+ }
+ fail("Should failed with exception");
+ }
+ }
+
private final class SubmissionFailingHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private SubmissionFailingHandler() {
@@ -619,8 +578,8 @@ public class RestClusterClientTest extends TestLogger {
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(
- @Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
- @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ @Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
+ @Nonnull DispatcherGateway gateway) throws RestHandlerException {
throw new RestHandlerException("expected", HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8406f70..7f0f0ed 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -36,7 +36,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
@@ -115,6 +114,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning;
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -455,7 +455,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final AtomicReference<Throwable> error = new AtomicReference<>();
Thread consumeThread = new Thread(() -> {
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
error.set(t);
@@ -1001,7 +1001,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final Runnable jobRunner = () -> {
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
jobError.set(t);
}
@@ -1071,7 +1071,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final Runnable jobRunner = () -> {
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
LOG.error("Job Runner failed with exception", t);
error.set(t);
@@ -1657,7 +1657,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
Thread jobThread = new Thread(() -> {
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
LOG.warn("Got exception during execution", t);
@@ -2134,7 +2134,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
Thread runner = new Thread(() -> {
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, KafkaConsumerTestBase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
tryExecute(readEnv, "sequence validation");
} catch (Throwable t) {
if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
index f7e3a87..96414ba 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java
@@ -19,7 +19,6 @@
package org.apache.flink.state.api;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -172,7 +170,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
String dirPath = getTempDirPath(new AbstractID().toHexString());
try {
- JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+ JobID jobID = client.submitJob(jobGraph).get();
boolean finished = false;
while (deadline.hasTimeLeft()) {
@@ -193,7 +191,7 @@ public abstract class SavepointReaderITTestBase extends AbstractTestBase {
Assert.fail("Failed to initialize state within deadline");
}
- CompletableFuture<String> path = client.triggerSavepoint(result.getJobID(), dirPath);
+ CompletableFuture<String> path = client.triggerSavepoint(jobID, dirPath);
return path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} finally {
client.cancel(jobId).get();
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index d87a33d..5dd3fcc 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -28,9 +28,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -51,6 +49,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.SerializedThrowable;
import org.junit.Assert;
import org.junit.Test;
@@ -63,6 +62,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -147,7 +147,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
bEnv.execute("Bootstrap");
}
- private void validateBootstrap(String savepointPath) throws ProgramInvocationException {
+ private void validateBootstrap(String savepointPath) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStateBackend(backend);
@@ -170,7 +170,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
ClusterClient<?> client = miniClusterResource.getClusterClient();
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+ Optional<SerializedThrowable> serializedThrowable = client
+ .submitJob(jobGraph)
+ .thenCompose(client::requestJobResult)
+ .get()
+ .getSerializedThrowable();
+ Assert.assertFalse(serializedThrowable.isPresent());
Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
}
@@ -193,7 +198,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
bEnv.execute("Modifying");
}
- private void validateModification(String savepointPath) throws ProgramInvocationException {
+ private void validateModification(String savepointPath) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStateBackend(backend);
@@ -216,7 +221,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
ClusterClient<?> client = miniClusterResource.getClusterClient();
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointWriterITCase.class.getClassLoader());
+ Optional<SerializedThrowable> serializedThrowable = client
+ .submitJob(jobGraph)
+ .thenCompose(client::requestJobResult)
+ .get()
+ .getSerializedThrowable();
+ Assert.assertFalse(serializedThrowable.isPresent());
Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
index 0d25682..8e75058 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.state.api.utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,11 +54,11 @@ public abstract class SavepointTestBase extends AbstractTestBase {
ClusterClient<?> client = miniClusterResource.getClusterClient();
try {
- JobSubmissionResult result = ClientUtils.submitJob(client, jobGraph);
+ JobID jobID = client.submitJob(jobGraph).get();
return CompletableFuture
.runAsync(waitingSource::awaitSource)
- .thenCompose(ignore -> triggerSavepoint(client, result.getJobID()))
+ .thenCompose(ignore -> triggerSavepoint(client, jobID))
.get(5, TimeUnit.MINUTES);
} catch (Exception e) {
throw new RuntimeException("Failed to take savepoint", e);
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index dacfd5a..cbd0a7a 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -108,7 +107,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
null));
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index d9e3c91..123d3d8 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -45,9 +45,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
@@ -57,6 +55,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -187,7 +186,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
final AtomicLongArray counts = new AtomicLongArray(numKeys);
@@ -296,16 +295,15 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
// Submit the job graph
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- boolean caughtException = false;
- try {
- ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, AbstractQueryableStateTestBase.class.getClassLoader());
- } catch (ProgramInvocationException e) {
- String failureCause = ExceptionUtils.stringifyException(e);
- assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
- caughtException = true;
- }
-
- assertTrue(caughtException);
+ clusterClient.submitJob(jobGraph)
+ .thenCompose(clusterClient::requestJobResult)
+ .thenApply(JobResult::getSerializedThrowable)
+ .thenAccept(serializedThrowable -> {
+ assertTrue(serializedThrowable.isPresent());
+ final Throwable t = serializedThrowable.get().deserializeError(getClass().getClassLoader());
+ final String failureCause = ExceptionUtils.stringifyException(t);
+ assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator"));
+ }).get();
}
/**
@@ -346,7 +344,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
}
}
@@ -380,7 +378,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(deadline, clusterClient, env)) {
- ClientUtils.submitJob(clusterClient, closableJobGraph.getJobGraph());
+ clusterClient.submitJob(closableJobGraph.getJobGraph()).get();
CompletableFuture<JobStatus> jobStatusFuture =
clusterClient.getJobStatus(closableJobGraph.getJobId());
@@ -481,7 +479,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
BasicTypeInfo.INT_TYPE_INFO,
valueState);
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
}
}
@@ -528,7 +526,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
// Now query
int key = 0;
@@ -596,7 +594,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
}
}
@@ -640,7 +638,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
final String expected = Integer.toString(numElements * (numElements + 1) / 2);
@@ -712,7 +710,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
final long expected = numElements * (numElements + 1L) / 2L;
@@ -804,7 +802,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
final long expected = numElements * (numElements + 1L) / 2L;
@@ -894,7 +892,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
final Map<Integer, Set<Long>> results = new HashMap<>();
@@ -979,7 +977,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
final JobID jobId = autoCancellableJob.getJobId();
final JobGraph jobGraph = autoCancellableJob.getJobGraph();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
for (int key = 0; key < maxParallelism; key++) {
boolean success = false;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index be078a0..5522375 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -260,7 +259,7 @@ public class WebFrontendITCase extends TestLogger {
final JobID jid = jobGraph.getJobID();
ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
// wait for job to show up
while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
@@ -318,7 +317,7 @@ public class WebFrontendITCase extends TestLogger {
final JobID jid = jobGraph.getJobID();
ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
// wait for job to show up
while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 42c889c..e8d223e 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -289,7 +288,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
0),
null));
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
waitForJob();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 8605623..b861956 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -115,7 +114,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
0),
null));
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
assertTrue(invokeLatch.await(60, TimeUnit.SECONDS));
waitForJob();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 4e94e43..bcf2702 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
@@ -60,6 +59,8 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+
/**
* Tests the availability of accumulator results during runtime.
*/
@@ -145,7 +146,7 @@ public class AccumulatorLiveITCase extends TestLogger {
final CheckedThread submissionThread = new CheckedThread() {
@Override
public void go() throws Exception {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
}
};
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03bc6eb..2551b9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,10 +18,9 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
@@ -97,14 +96,14 @@ public abstract class CancelingTestBase extends TestLogger {
final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
ClusterClient<?> client = CLUSTER.getClusterClient();
- JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+ JobID jobID = client.submitJob(jobGraph).get();
Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
- JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+ JobStatus jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
- jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+ jobStatus = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
@@ -112,17 +111,17 @@ public abstract class CancelingTestBase extends TestLogger {
Thread.sleep(msecsTillCanceling);
- client.cancel(jobSubmissionResult.getJobID()).get();
+ client.cancel(jobID).get();
Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
- JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+ JobStatus jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
- jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
+ jobStatusAfterCancel = client.getJobStatus(jobID).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
- Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
+ Assert.fail("Failed to cancel job with ID " + jobID + '.');
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index e7e6e5c..9a6428f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -174,7 +173,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobID = jobGraph.getJobID();
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
TestingCompletedCheckpointStore.addCheckpointLatch.await();
TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index d180675..3459e54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -77,6 +76,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
/**
@@ -143,7 +143,7 @@ public class RegionFailoverITCase extends TestLogger {
try {
JobGraph jobGraph = createJobGraph();
ClusterClient<?> client = cluster.getClusterClient();
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, RegionFailoverITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
verifyAfterJobExecuted();
} catch (Exception e) {
e.printStackTrace();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index b011950..be58fd7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -79,6 +78,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -188,7 +188,7 @@ public class RescalingITCase extends TestLogger {
final JobID jobID = jobGraph.getJobID();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
// wait til the sources have emitted numberElements for each key and completed a checkpoint
assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -226,7 +226,7 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
@@ -267,7 +267,7 @@ public class RescalingITCase extends TestLogger {
final JobID jobID = jobGraph.getJobID();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
@@ -287,7 +287,7 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
} catch (JobExecutionException exception) {
if (exception.getCause() instanceof IllegalStateException) {
// we expect a IllegalStateException wrapped
@@ -332,7 +332,7 @@ public class RescalingITCase extends TestLogger {
final JobID jobID = jobGraph.getJobID();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
// wait til the sources have emitted numberElements for each key and completed a checkpoint
assertTrue(SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -375,7 +375,7 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
@@ -454,7 +454,7 @@ public class RescalingITCase extends TestLogger {
final JobID jobID = jobGraph.getJobID();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
@@ -479,7 +479,7 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- ClientUtils.submitJobAndWaitForResult(client, scaledJobGraph, RescalingITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
int sumExp = 0;
int sumAct = 0;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index e39fce7..d9f4bf30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -295,7 +294,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
- ClientUtils.submitJob(client, initialJobGraph);
+ client.submitJob(initialJobGraph).get();
// wait until all sources have been started
NotifyingInfiniteTupleSource.countDownLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index abdc188..d70a409 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -91,6 +89,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -209,7 +208,7 @@ public class SavepointITCase extends TestLogger {
ClusterClient<?> client = cluster.getClusterClient();
try {
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
StatefulCounter.getProgressLatch().await();
@@ -253,7 +252,7 @@ public class SavepointITCase extends TestLogger {
ClusterClient<?> client = cluster.getClusterClient();
try {
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
// Await state is restored
StatefulCounter.getRestoreLatch().await();
@@ -337,7 +336,7 @@ public class SavepointITCase extends TestLogger {
final JobGraph graph = new JobGraph(vertex);
try {
- ClientUtils.submitJob(client, graph);
+ client.submitJob(graph).get();
client.triggerSavepoint(graph.getJobID(), null).get();
@@ -386,7 +385,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
try {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, SavepointITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Exception e) {
Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
@@ -452,8 +451,7 @@ public class SavepointITCase extends TestLogger {
JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
- JobSubmissionResult submissionResult = ClientUtils.submitJob(client, originalJobGraph);
- JobID jobID = submissionResult.getJobID();
+ JobID jobID = client.submitJob(originalJobGraph).get();
// wait for the Tasks to be ready
assertTrue(StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -502,7 +500,7 @@ public class SavepointITCase extends TestLogger {
"savepoint path " + savepointPath + " in detached mode.");
// Submit the job
- ClientUtils.submitJob(client, modifiedJobGraph);
+ client.submitJob(modifiedJobGraph).get();
// Await state is restored
assertTrue(StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
@@ -696,7 +694,7 @@ public class SavepointITCase extends TestLogger {
String savepointPath = null;
try {
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
for (OneShotLatch latch : iterTestSnapshotWait) {
latch.await();
}
@@ -710,7 +708,7 @@ public class SavepointITCase extends TestLogger {
jobGraph = streamGraph.getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
for (OneShotLatch latch : iterTestRestoreWait) {
latch.await();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index f6c0efb..4f500fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,8 +19,6 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +26,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -41,7 +40,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
-import static org.junit.Assert.fail;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
/**
* Test base for fault tolerant streaming programs.
@@ -126,21 +125,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
try {
- ClientUtils.submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader()).getJobExecutionResult();
- } catch (ProgramInvocationException root) {
- Throwable cause = root.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (!(cause instanceof SuccessException)) {
- if (cause == null || depth++ == 20) {
- root.printStackTrace();
- fail("Test failed: " + root.getMessage());
- }
- else {
- cause = cause.getCause();
- }
- }
+ submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
}
postSubmit();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 81df28f..4bd4754 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -209,7 +208,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
- ClientUtils.submitJob(clusterClient, jobGraph);
+ clusterClient.submitJob(jobGraph).get();
// wait until we did some checkpoints
waitForCheckpointLatch.await();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1e7f619..7787a18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -20,10 +20,8 @@ package org.apache.flink.test.checkpointing.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
@@ -130,14 +128,14 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+ JobID jobID = client.submitJob(jobGraph).get();
- LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+ LOG.info("Submitted job {} and waiting...", jobID);
boolean done = false;
while (deadLine.hasTimeLeft()) {
Thread.sleep(100);
- Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
+ Map<String, Object> accumulators = client.getAccumulators(jobID).get();
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
@@ -165,7 +163,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
LOG.info("Triggering savepoint.");
- CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
String jobmanagerSavepointPath = savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
@@ -193,17 +191,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
+ JobID jobID = client.submitJob(jobGraph).get();
boolean done = false;
while (deadLine.hasTimeLeft()) {
// try and get a job result, this will fail if the job already failed. Use this
// to get out of this loop
- JobID jobId = jobSubmissionResult.getJobID();
try {
- CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
+ CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobID);
JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
@@ -213,7 +210,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
}
Thread.sleep(100);
- Map<String, Object> accumulators = client.getAccumulators(jobId).get();
+ Map<String, Object> accumulators = client.getAccumulators(jobID).get();
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 063a9cd..e0611a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -20,7 +20,6 @@
package org.apache.flink.test.example.client;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
@@ -95,7 +94,7 @@ public class JobRetrievalITCase extends TestLogger {
// has been attached in resumingThread
lock.acquire();
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index f19080f..5723978 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.test.example.failing;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
@@ -45,6 +44,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.function.Predicate;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.fail;
/**
@@ -127,14 +127,14 @@ public class JobSubmissionFailsITCase extends TestLogger {
runJobSubmissionTest(jobGraph, e -> ExceptionUtils.findThrowable(e, IOException.class).isPresent());
}
- private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws org.apache.flink.client.program.ProgramInvocationException {
+ private void runJobSubmissionTest(JobGraph jobGraph, Predicate<Exception> failurePredicate) throws Exception {
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
try {
if (detached) {
- ClientUtils.submitJob(client, jobGraph);
+ client.submitJob(jobGraph).get();
} else {
- ClientUtils.submitJobAndWaitForResult(client, jobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
}
fail("Job submission should have thrown an exception.");
} catch (Exception e) {
@@ -143,7 +143,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
}
}
- ClientUtils.submitJobAndWaitForResult(client, getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
+ submitJobAndWaitForResult(client, getWorkingJobGraph(), getClass().getClassLoader());
}
@Nonnull
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 6a148f0..479ae57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.runtime;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
@@ -34,10 +32,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -266,22 +266,22 @@ public class NetworkStackThroughputITCase extends TestLogger {
final boolean isSlowSender,
final boolean isSlowReceiver,
final int parallelism) throws Exception {
- ClusterClient<?> client = cluster.getClusterClient();
-
- JobExecutionResult jer = ClientUtils.submitJobAndWaitForResult(
- client,
- createJobGraph(
- dataVolumeGb,
- useForwarder,
- isSlowSender,
- isSlowReceiver,
- parallelism),
- getClass().getClassLoader());
-
- long dataVolumeMbit = dataVolumeGb * 8192;
- long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
-
- int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+ final ClusterClient<?> client = cluster.getClusterClient();
+ final JobGraph jobGraph = createJobGraph(
+ dataVolumeGb,
+ useForwarder,
+ isSlowSender,
+ isSlowReceiver,
+ parallelism);
+ final JobResult jobResult = client.submitJob(jobGraph)
+ .thenCompose(client::requestJobResult)
+ .get();
+
+ Assert.assertFalse(jobResult.getSerializedThrowable().isPresent());
+
+ final long dataVolumeMbit = dataVolumeGb * 8192;
+ final long runtimeSecs = TimeUnit.SECONDS.convert(jobResult.getNetRuntime(), TimeUnit.MILLISECONDS);
+ final int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index a9321fa..43bc0dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -120,7 +119,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
assertNotNull(jobToMigrate.getJobID());
- ClientUtils.submitJob(clusterClient, jobToMigrate);
+ clusterClient.submitJob(jobToMigrate).get();
CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
@@ -172,7 +171,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
- ClientUtils.submitJob(clusterClient, jobToRestore);
+ clusterClient.submitJob(jobToRestore).get();
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(
() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index b3842c0..443f968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
/**
@@ -92,7 +92,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
StandaloneClusterId.getInstance());
try {
- ClientUtils.submitJobAndWaitForResult(restClusterClient, jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
+ submitJobAndWaitForResult(restClusterClient, jobGraph, getClass().getClassLoader());
List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 1606783..95550e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,8 +19,10 @@
package org.apache.flink.test.util;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import static org.junit.Assert.fail;
@@ -52,4 +54,11 @@ public class TestUtils {
return null;
}
+
+ public static void submitJobAndWaitForResult(ClusterClient<?> client, JobGraph jobGraph, ClassLoader classLoader) throws Exception {
+ client.submitJob(jobGraph)
+ .thenCompose(client::requestJobResult)
+ .get()
+ .toJobExecutionResult(classLoader);
+ }
}