You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/03 14:36:20 UTC

[1/3] flink git commit: [FLINK-9094] [tests] Harden AccumulatorLiveITCase

Repository: flink
Updated Branches:
  refs/heads/release-1.5 dffbf4181 -> 515069e14


[FLINK-9094] [tests] Harden AccumulatorLiveITCase

The problem was that we did not wait for the proper shut down of the job before
resetting the latches. This could lead to a deadlock.

This closes #5771.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6982c50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6982c50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6982c50

Branch: refs/heads/release-1.5
Commit: b6982c502196b5059b1bf576f620f45ce0e3aa72
Parents: 1209620
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 27 09:22:08 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 16:35:59 2018 +0200

----------------------------------------------------------------------
 .../test/accumulators/AccumulatorLiveITCase.java    | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6982c50/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 302fe3e..379e8ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -140,10 +141,16 @@ public class AccumulatorLiveITCase extends TestLogger {
 	private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
 		Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));
 
-		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+		final ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
-		client.setDetached(true);
-		client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+		final CheckedThread submissionThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+			}
+		};
+
+		submissionThread.start();
 
 		try {
 			NotifyingMapper.notifyLatch.await();
@@ -167,6 +174,9 @@ public class AccumulatorLiveITCase extends TestLogger {
 			NotifyingMapper.shutdownLatch.trigger();
 		} finally {
 			NotifyingMapper.shutdownLatch.trigger();
+
+			// wait for the job to have terminated
+			submissionThread.sync();
 		}
 	}
 


[3/3] flink git commit: [FLINK-6567] [tests] Harden ExecutionGraphMetricsTest

Posted by tr...@apache.org.
[FLINK-6567] [tests] Harden ExecutionGraphMetricsTest

The problem was that in some cases the currentRestartingTime would not increase
because the iteration in the loop were too fast.

This closes #5782.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/515069e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/515069e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/515069e1

Branch: refs/heads/release-1.5
Commit: 515069e14f770ebfb86df27dc27b858ded51c6d5
Parents: b6982c5
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 28 15:35:11 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 16:36:00 2018 +0200

----------------------------------------------------------------------
 .../ExecutionGraphMetricsTest.java              | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/515069e1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 63b3238..ec0d2e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -120,6 +120,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			assertEquals(JobStatus.RUNNING, executionGraph.getState());
 			assertEquals(0L, restartingTime.getValue().longValue());
 
+			// add some pause such that RUNNING and RESTARTING timestamps are not the same
+			Thread.sleep(1L);
+
 			// fail the job so that it goes into state restarting
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
@@ -129,13 +132,13 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
 
-			// wait some time so that the restarting time gauge shows a value different from 0
-			Thread.sleep(50);
-
 			long previousRestartingTime = restartingTime.getValue();
 
 			// check that the restarting time is monotonically increasing
-			for (int i = 0; i < 10; i++) {
+			for (int i = 0; i < 2; i++) {
+				// add some pause to let the currentRestartingTime increase
+				Thread.sleep(1L);
+
 				long currentRestartingTime = restartingTime.getValue();
 
 				assertTrue(currentRestartingTime >= previousRestartingTime);
@@ -165,13 +168,16 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			previousRestartingTime = restartingTime.getValue();
 
 			// check that the restarting time does not increase after we've reached the running state
-			for (int i = 0; i < 10; i++) {
+			for (int i = 0; i < 2; i++) {
 				long currentRestartingTime = restartingTime.getValue();
 
 				assertTrue(currentRestartingTime == previousRestartingTime);
 				previousRestartingTime = currentRestartingTime;
 			}
 
+			// add some pause such that the RUNNING and RESTARTING timestamps are not the same
+			Thread.sleep(1L);
+
 			// fail job again
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
@@ -183,12 +189,12 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
 
-			Thread.sleep(50);
-
 			previousRestartingTime = restartingTime.getValue();
 
 			// check that the restarting time is increasing again
-			for (int i = 0; i < 10; i++) {
+			for (int i = 0; i < 2; i++) {
+				// add some pause to the let currentRestartingTime increase
+				Thread.sleep(1L);
 				long currentRestartingTime = restartingTime.getValue();
 
 				assertTrue(currentRestartingTime >= previousRestartingTime);


[2/3] flink git commit: [hotfix] Introduce NewClusterClient interface

Posted by tr...@apache.org.
[hotfix] Introduce NewClusterClient interface

The NewClusterClient interface contains asynchronous submitJob and requestJobResult
methods which will replace the ClusterClient#submitJob method.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12096207
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12096207
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12096207

Branch: refs/heads/release-1.5
Commit: 12096207614179dce0af0d44de6e1b0e27c60e02
Parents: dffbf41
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 27 09:20:33 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 3 16:35:59 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/MiniClusterClient.java | 45 ++++++++++++---
 .../flink/client/program/NewClusterClient.java  | 50 ++++++++++++++++
 .../client/program/rest/RestClusterClient.java  | 27 +++++----
 .../flink/runtime/minicluster/MiniCluster.java  | 60 ++++++++++++--------
 4 files changed, 138 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index a135359..faf96ec 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,23 +21,25 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,11 +47,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Client to interact with a {@link MiniCluster}.
  */
-public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> {
+public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> implements NewClusterClient {
 
 	private final MiniCluster miniCluster;
 
@@ -66,28 +69,54 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 
 	@Override
 	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
+
 		if (isDetached()) {
 			try {
-				miniCluster.runDetached(jobGraph);
-			} catch (JobExecutionException | InterruptedException e) {
+				return jobSubmissionResultFuture.get();
+			} catch (InterruptedException | ExecutionException e) {
+				ExceptionUtils.checkInterrupted(e);
+
 				throw new ProgramInvocationException(
 					String.format("Could not run job %s in detached mode.", jobGraph.getJobID()),
 					e);
 			}
-
-			return new JobSubmissionResult(jobGraph.getJobID());
 		} else {
+			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
+				(JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));
+
+			final JobResult jobResult;
 			try {
-				return miniCluster.executeJobBlocking(jobGraph);
-			} catch (JobExecutionException | InterruptedException e) {
+				jobResult = jobResultFuture.get();
+			} catch (InterruptedException | ExecutionException e) {
+				ExceptionUtils.checkInterrupted(e);
+
 				throw new ProgramInvocationException(
 					String.format("Could not run job %s.", jobGraph.getJobID()),
 					e);
 			}
+
+			try {
+				return jobResult.toJobExecutionResult(classLoader);
+			} catch (JobResult.WrappedJobException e) {
+				throw new ProgramInvocationException(e.getCause());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new ProgramInvocationException(e);
+			}
 		}
 	}
 
 	@Override
+	public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
+		return miniCluster.submitJob(jobGraph);
+	}
+
+	@Override
+	public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+		return miniCluster.requestJobResult(jobId);
+	}
+
+	@Override
 	public void cancel(JobID jobId) throws Exception {
 		miniCluster.cancelJob(jobId).get();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
new file mode 100644
index 0000000..513f7da
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for the new cluster client.
+ */
+public interface NewClusterClient {
+
+	/**
+	 * Submit the given {@link JobGraph} to the cluster.
+	 *
+	 * @param jobGraph to submit
+	 * @return Future which is completed with the {@link JobSubmissionResult}
+	 */
+	CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);
+
+	/**
+	 * Request the {@link JobResult} for the given {@link JobID}.
+	 *
+	 * @param jobId for which to request the {@link JobResult}
+	 * @return Future which is completed with the {@link JobResult}
+	 */
+	CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index cf68374..4a4f993 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.NewClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
@@ -101,6 +102,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 
 import akka.actor.AddressFromURIString;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -126,7 +128,7 @@ import java.util.stream.Collectors;
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
  */
-public class RestClusterClient<T> extends ClusterClient<T> {
+public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
 
 	private final RestClusterClientConfiguration restClusterClientConfiguration;
 
@@ -235,16 +237,14 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
 		log.info("Submitting job {}.", jobGraph.getJobID());
 
-		final CompletableFuture<JobSubmitResponseBody> jobSubmissionFuture = submitJob(jobGraph);
+		final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
 
 		if (isDetached()) {
 			try {
-				jobSubmissionFuture.get();
+				return jobSubmissionFuture.get();
 			} catch (Exception e) {
 				throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() + '.', ExceptionUtils.stripExecutionException(e));
 			}
-
-			return new JobSubmissionResult(jobGraph.getJobID());
 		} else {
 			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
 				ignored -> requestJobResult(jobGraph.getJobID()));
@@ -286,7 +286,8 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 	 * @return Future which is completed with the {@link JobResult} once the job has completed or
 	 * with a failure if the {@link JobResult} could not be retrieved.
 	 */
-	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+	@Override
+	public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
 		return pollResourceAsync(
 			() -> {
 				final JobMessageParameters messageParameters = new JobMessageParameters();
@@ -305,7 +306,8 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 	 * @param jobGraph to submit
 	 * @return Future which is completed with the submission response
 	 */
-	public CompletableFuture<JobSubmitResponseBody> submitJob(JobGraph jobGraph) {
+	@Override
+	public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
 		// we have to enable queued scheduling because slot will be allocated lazily
 		jobGraph.setAllowQueuedScheduling(true);
 
@@ -346,10 +348,13 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 				}
 			});
 
-		return submissionFuture.exceptionally(
-			(Throwable throwable) -> {
-				throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", throwable));
-			});
+		return submissionFuture
+			.thenApply(
+				(JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
+			.exceptionally(
+				(Throwable throwable) -> {
+					throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", throwable));
+				});
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 59e5ff0..2e826eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -573,18 +574,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		final DispatcherGateway currentDispatcherGateway;
-		try {
-			currentDispatcherGateway = getDispatcherGateway();
-		} catch (LeaderRetrievalException e) {
-			throw new JobExecutionException(job.getJobID(), e);
-		}
-
-		// we have to allow queued scheduling in the new mode because we need to request slots
-		// from the ResourceManager
-		job.setAllowQueuedScheduling(true);
-
-		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		try {
 			submissionFuture.get();
@@ -607,21 +597,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		final DispatcherGateway currentDispatcherGateway;
-		try {
-			currentDispatcherGateway = getDispatcherGateway();
-		} catch (LeaderRetrievalException e) {
-			throw new JobExecutionException(job.getJobID(), e);
-		}
-
-		// we have to allow queued scheduling in the new mode because we need to request slots
-		// from the ResourceManager
-		job.setAllowQueuedScheduling(true);
-
-		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
-			(Acknowledge ack) -> currentDispatcherGateway.requestJobResult(job.getJobID(), RpcUtils.INF_TIMEOUT));
+			(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
 
 		final JobResult jobResult;
 
@@ -640,6 +619,37 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
+	public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
+		final DispatcherGateway dispatcherGateway;
+		try {
+			dispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			ExceptionUtils.checkInterrupted(e);
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
+		// from the ResourceManager
+		jobGraph.setAllowQueuedScheduling(true);
+
+		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+
+		return acknowledgeCompletableFuture.thenApply(
+			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
+	}
+
+	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+		final DispatcherGateway dispatcherGateway;
+		try {
+			dispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			ExceptionUtils.checkInterrupted(e);
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		return dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT);
+	}
+
 	private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
 		synchronized (lock) {
 			checkState(running, "MiniCluster is not yet running.");