You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/09/06 12:00:53 UTC
[flink] branch master updated: [FLINK-16866] Make jobsubmission
non-blocking
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 65ed039 [FLINK-16866] Make jobsubmission non-blocking
65ed039 is described below
commit 65ed03936a736190f7ffcb6ba0709a99ffcb98fa
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jul 29 10:42:32 2020 +0200
[FLINK-16866] Make jobsubmission non-blocking
This closes #13217
---
.../java/org/apache/flink/client/ClientUtils.java | 47 +++
.../application/executors/EmbeddedExecutor.java | 14 +-
.../executors/AbstractJobClusterExecutor.java | 2 +-
.../executors/AbstractSessionClusterExecutor.java | 11 +-
.../client/deployment/executors/LocalExecutor.java | 4 +-
.../client/program/PerJobMiniClusterFactory.java | 10 +-
.../org/apache/flink/client/ClientUtilsTest.java | 127 +++++++
.../apache/flink/client/program/ClientTest.java | 2 +-
.../program/PerJobMiniClusterFactoryTest.java | 18 +-
.../org/apache/flink/api/common/JobStatus.java | 7 +-
.../flink/core/execution/PipelineExecutor.java | 3 +-
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../java/ExecutorDiscoveryAndJobClientTest.java | 2 +-
.../web-dashboard/src/app/app.config.ts | 3 +-
.../src/app/pages/job/job.component.html | 20 +-
.../src/app/pages/job/job.component.ts | 42 ++-
.../web-dashboard/src/app/services/job.service.ts | 3 +-
.../customize/job-list/job-list.component.html | 2 +-
.../share/customize/job-list/job-list.component.ts | 10 +-
.../runtime/client/JobInitializationException.java | 32 +-
.../flink/runtime/dispatcher/Dispatcher.java | 368 +++++++++------------
.../flink/runtime/dispatcher/DispatcherJob.java | 252 ++++++++++++++
.../runtime/dispatcher/DispatcherJobResult.java | 66 ++++
.../UnavailableDispatcherOperationException.java | 29 +-
.../executiongraph/ArchivedExecutionGraph.java | 49 +++
.../runtime/dispatcher/DispatcherJobTest.java | 337 +++++++++++++++++++
.../dispatcher/DispatcherResourceCleanupTest.java | 19 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 262 +++++++++++++--
.../dispatcher/TestingJobManagerRunnerFactory.java | 5 +-
.../runtime/jobmanager/BlobsCleanupITCase.java | 25 +-
.../runtime/jobmaster/TestingJobManagerRunner.java | 61 +++-
.../LeaderChangeClusterComponentsTest.java | 3 +
.../runtime/taskexecutor/TaskExecutorITCase.java | 3 +-
.../flink/runtime/testutils/CommonTestUtils.java | 14 +
.../runtime/webmonitor/TestingRestfulGateway.java | 2 +-
.../environment/StreamExecutionEnvironment.java | 2 +-
.../ExecutorDiscoveryAndJobClientTest.java | 2 +-
.../table/client/gateway/local/LocalExecutor.java | 4 +-
.../client/gateway/local/ProgramDeployer.java | 7 +-
.../table/api/internal/BatchTableEnvImpl.scala | 3 +-
.../flink/test/checkpointing/SavepointITCase.java | 3 +
.../test/example/client/LocalExecutorITCase.java | 4 +-
.../example/failing/JobSubmissionFailsITCase.java | 24 +-
.../environment/RemoteStreamEnvironmentTest.java | 2 +-
.../test/streaming/runtime/BackPressureITCase.java | 5 +
.../test/streaming/runtime/TimestampITCase.java | 3 +-
.../java/org/apache/flink/test/util/TestUtils.java | 9 +
.../flink/yarn/YARNHighAvailabilityITCase.java | 3 +-
48 files changed, 1533 insertions(+), 394 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index d7b069f..1991389 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,15 +18,23 @@
package org.apache.flink.client;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StreamContextEnvironment;
+import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
+import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
+import java.util.Optional;
import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -111,4 +120,42 @@ public enum ClientUtils {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
+
+ /**
+ * This method blocks until the job status is not INITIALIZING anymore.
+ * @param jobStatusSupplier supplier returning the job status.
+ * @param jobResultSupplier supplier returning the job result. This will only be called if the job reaches the FAILED state.
+ * @throws JobInitializationException If the initialization failed
+ */
+ public static void waitUntilJobInitializationFinished(
+ SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+ SupplierWithException<JobResult, Exception> jobResultSupplier,
+ ClassLoader userCodeClassloader)
+ throws JobInitializationException {
+ LOG.debug("Wait until job initialization is finished");
+ WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000);
+ try {
+ JobStatus status = jobStatusSupplier.get();
+ long attempt = 0;
+ while (status == JobStatus.INITIALIZING) {
+ Thread.sleep(waitStrategy.sleepTime(attempt++));
+ status = jobStatusSupplier.get();
+ }
+ if (status == JobStatus.FAILED) {
+ JobResult result = jobResultSupplier.get();
+ Optional<SerializedThrowable> throwable = result.getSerializedThrowable();
+ if (throwable.isPresent()) {
+ Throwable t = throwable.get().deserializeError(userCodeClassloader);
+ if (t instanceof JobInitializationException) {
+ throw t;
+ }
+ }
+ }
+ } catch (JobInitializationException initializationException) {
+ throw initializationException;
+ } catch (Throwable throwable) {
+ ExceptionUtils.checkInterrupted(throwable);
+ throw new RuntimeException("Error while waiting for job to be initialized", throwable);
+ }
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index aa2db40..ccb24d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
}
@Override
- public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
+ public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration, ClassLoader userCodeClassloader) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
@@ -94,7 +95,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
return getJobClientFuture(optJobId.get());
}
- return submitAndGetJobClientFuture(pipeline, configuration);
+ return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
}
private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId) {
@@ -102,7 +103,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
}
- private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
+ private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException {
final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
@@ -122,6 +123,13 @@ public class EmbeddedExecutor implements PipelineExecutor {
timeout);
return jobSubmissionFuture
+ .thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
+ org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+ () -> dispatcherGateway.requestJobStatus(jobId, timeout).get(),
+ () -> dispatcherGateway.requestJobResult(jobId, timeout).get(),
+ userCodeClassloader);
+ return jobId;
+ }))
.thenApplyAsync(jobID -> jobClientCreator.getJobClient(actualJobId));
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
index 60254f9..d4a51ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
@@ -58,7 +58,7 @@ public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends Cluster
}
@Override
- public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
index 6445fb6..094a1b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment.executors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.function.FunctionUtils;
import javax.annotation.Nonnull;
@@ -53,7 +55,7 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu
}
@Override
- public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
@@ -64,6 +66,13 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
return clusterClient
.submitJob(jobGraph)
+ .thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
+ ClientUtils.waitUntilJobInitializationFinished(
+ () -> clusterClient.getJobStatus(jobId).get(),
+ () -> clusterClient.requestJobResult(jobId).get(),
+ userCodeClassloader);
+ return jobId;
+ }))
.thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID))
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index a64f030..578265d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -65,7 +65,7 @@ public class LocalExecutor implements PipelineExecutor {
}
@Override
- public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
+ public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
@@ -78,7 +78,7 @@ public class LocalExecutor implements PipelineExecutor {
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
- return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
+ return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index 16ca6b6..f5a793d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequestGatewa
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,13 +82,20 @@ public final class PerJobMiniClusterFactory {
/**
* Starts a {@link MiniCluster} and submits a job.
*/
- public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws Exception {
+ public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
return miniCluster
.submitJob(jobGraph)
+ .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+ org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+ () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+ () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+ userCodeClassloader);
+ return submissionResult;
+ }))
.thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
new file mode 100644
index 0000000..056e181
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+ private static final JobID TESTING_JOB_ID = new JobID();
+
+ /**
+ * Ensure that the waitUntilJobInitializationFinished() method throws JobInitializationException.
+ */
+ @Test
+ public void testWaitUntilJobInitializationFinished_throwsInitializationException() {
+ Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+ JobStatus.INITIALIZING,
+ JobStatus.INITIALIZING,
+ JobStatus.FAILED).iterator();
+
+ CommonTestUtils.assertThrows("Something is wrong", JobInitializationException.class, () -> {
+ ClientUtils.waitUntilJobInitializationFinished(
+ statusSequenceIterator::next,
+ () -> {
+ Throwable throwable = new JobInitializationException(
+ TESTING_JOB_ID,
+ "Something is wrong",
+ new RuntimeException("Err"));
+ return buildJobResult(throwable);
+ },
+ ClassLoader.getSystemClassLoader());
+ return null;
+ });
+ }
+
+ /**
+ * Ensure that waitUntilJobInitializationFinished() does not throw non-initialization exceptions.
+ */
+ @Test
+ public void testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException() throws Exception {
+ Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+ JobStatus.INITIALIZING,
+ JobStatus.INITIALIZING,
+ JobStatus.FAILED).iterator();
+ ClientUtils.waitUntilJobInitializationFinished(
+ statusSequenceIterator::next,
+ () -> buildJobResult(new RuntimeException("Err")),
+ ClassLoader.getSystemClassLoader());
+ }
+
+ /**
+ * Ensure that other errors are thrown.
+ */
+ @Test
+ public void testWaitUntilJobInitializationFinished_throwsOtherErrors() {
+ CommonTestUtils.assertThrows("Error while waiting for job to be initialized", RuntimeException.class, () -> {
+ ClientUtils.waitUntilJobInitializationFinished(() -> {
+ throw new RuntimeException("other error");
+ },
+ () -> {
+ Throwable throwable = new JobInitializationException(
+ TESTING_JOB_ID,
+ "Something is wrong",
+ new RuntimeException("Err"));
+ return buildJobResult(throwable);
+ },
+ ClassLoader.getSystemClassLoader());
+ return null;
+ });
+ }
+
+ private JobResult buildJobResult(Throwable throwable) {
+ return new JobResult.Builder()
+ .jobId(TESTING_JOB_ID)
+ .serializedThrowable(new SerializedThrowable(throwable))
+ .netRuntime(1)
+ .build();
+ }
+
+ /**
+ * Test normal operation.
+ */
+ @Test
+ public void testWaitUntilJobInitializationFinished_regular() throws Exception {
+ Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+ JobStatus.INITIALIZING,
+ JobStatus.INITIALIZING,
+ JobStatus.RUNNING).iterator();
+ ClientUtils.waitUntilJobInitializationFinished(
+ statusSequenceIterator::next, () -> {
+ Assert.fail("unexpected call");
+ return null;
+ },
+ ClassLoader.getSystemClassLoader());
+ }
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 22b78c8..4ca9270 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -420,7 +420,7 @@ public class ClientTest extends TestLogger {
@Override
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
- return (pipeline, config) -> {
+ return (pipeline, config, classLoader) -> {
final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
index 82a4750..0a69cde 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
@@ -59,7 +59,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
public void testJobExecution() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
- JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+ JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThat(jobExecutionResult, is(notNullValue()));
@@ -76,7 +76,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
JobGraph cancellableJobGraph = getCancellableJobGraph();
JobClient jobClient = perJobMiniClusterFactory
- .submitJob(cancellableJobGraph)
+ .submitJob(cancellableJobGraph, ClassLoader.getSystemClassLoader())
.get();
assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
@@ -96,7 +96,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
@Test
public void testJobClientSavepoint() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
- JobClient jobClient = perJobMiniClusterFactory.submitJob(getCancellableJobGraph()).get();
+ JobClient jobClient = perJobMiniClusterFactory.submitJob(getCancellableJobGraph(), ClassLoader.getSystemClassLoader()).get();
assertThrows(
"is not a streaming job.",
@@ -117,23 +117,21 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
JobGraph jobGraph = new JobGraph();
assertThrows(
- "Failed to submit job.",
+ "Could not instantiate JobManager",
ExecutionException.class,
- () -> perJobMiniClusterFactory.submitJob(jobGraph).get());
-
- assertThatMiniClusterIsShutdown();
+ () -> perJobMiniClusterFactory.submitJob(jobGraph, ClassLoader.getSystemClassLoader()).get());
}
@Test
public void testMultipleExecutions() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
{
- JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+ JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
}
{
- JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+ JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
}
@@ -142,7 +140,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
@Test
public void testJobClientInteractionAfterShutdown() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
- JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+ JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
index b0ddd24..5cb6d90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
@@ -21,10 +21,15 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.PublicEvolving;
/**
- * Possible states of a job once it has been accepted by the job manager.
+ * Possible states of a job once it has been accepted by the dispatcher.
*/
@PublicEvolving
public enum JobStatus {
+ /**
+ * The job has been received by the Dispatcher, and is waiting for the job manager to be
+ * created.
+ */
+ INITIALIZING(TerminalState.NON_TERMINAL),
/** Job is newly created, no task has started to run. */
CREATED(TerminalState.NON_TERMINAL),
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
index afe7cad..66a9ef6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
@@ -39,7 +39,8 @@ public interface PipelineExecutor {
*
* @param pipeline the {@link Pipeline} to execute
* @param configuration the {@link Configuration} with the required execution parameters
+ * @param userCodeClassloader the {@link ClassLoader} to deserialize usercode
* @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
*/
- CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
+ CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws Exception;
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 910c419..4eb48e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -970,7 +970,7 @@ public class ExecutionEnvironment {
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
- .execute(plan, configuration);
+ .execute(plan, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index ab4a66b..cf046af 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -86,7 +86,7 @@ public class ExecutorDiscoveryAndJobClientTest {
@Override
public PipelineExecutor getExecutor(Configuration configuration) {
- return (pipeline, executionConfig) -> CompletableFuture.completedFuture(new TestingJobClient());
+ return (pipeline, executionConfig, classLoader) -> CompletableFuture.completedFuture(new TestingJobClient());
}
}
}
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime-web/web-dashboard/src/app/app.config.ts
index 6d41b67..97a5c83 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime-web/web-dashboard/src/app/app.config.ts
@@ -30,6 +30,7 @@ export const COLOR_MAP = {
IN_PROGRESS: '#faad14',
SCHEDULED: '#722ed1',
COMPLETED: '#1890ff',
- RESTARTING: '#13c2c2'
+ RESTARTING: '#13c2c2',
+ INITIALIZING: '#738df8'
};
export const LONG_MIN_VALUE = -9223372036854776000;
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
index 62aa2fd..cb1fedc 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
@@ -15,11 +15,19 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-<flink-job-status [isLoading]="isLoading"></flink-job-status>
-<div class="content">
- <nz-skeleton [nzActive]="true" *ngIf="isLoading"></nz-skeleton>
- <div class="router" *ngIf="!isLoading">
- <router-outlet></router-outlet>
+<ng-container *ngIf="!isError">
+ <flink-job-status [isLoading]="isLoading"></flink-job-status>
+ <div class="content">
+ <nz-skeleton [nzActive]="true" *ngIf="isLoading"></nz-skeleton>
+ <div class="router" *ngIf="!isLoading">
+ <router-outlet></router-outlet>
+ </div>
</div>
-</div>
+</ng-container>
+
+
+<nz-alert *ngIf="isError" nzShowIcon nzType="warning" nzMessage="Job failed during initialization of JobManager" [nzDescription]=descriptionTemplateRef></nz-alert>
+<ng-template #descriptionTemplateRef>
+ <pre>{{errorDetails}}</pre>
+</ng-template>
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
index ef48463..bf185ab 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
-import { ActivatedRoute } from '@angular/router';
-import { Subject } from 'rxjs';
-import { flatMap, takeUntil } from 'rxjs/operators';
-import { JobService, StatusService } from 'services';
+import {ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit} from '@angular/core';
+import {ActivatedRoute} from '@angular/router';
+import {EMPTY, Subject} from 'rxjs';
+import {catchError, flatMap, takeUntil} from 'rxjs/operators';
+import {JobService, StatusService} from 'services';
@Component({
selector: 'flink-job',
@@ -31,6 +31,8 @@ import { JobService, StatusService } from 'services';
export class JobComponent implements OnInit, OnDestroy {
destroy$ = new Subject();
isLoading = true;
+ isError = false;
+ errorDetails: string;
constructor(
private cdr: ChangeDetectorRef,
@@ -43,18 +45,26 @@ export class JobComponent implements OnInit, OnDestroy {
this.statusService.refresh$
.pipe(
takeUntil(this.destroy$),
- flatMap(() => this.jobService.loadJob(this.activatedRoute.snapshot.params.jid))
+ flatMap(() =>
+ this.jobService.loadJob(this.activatedRoute.snapshot.params.jid).pipe(
+ catchError(() => {
+ this.jobService.loadExceptions(this.activatedRoute.snapshot.params.jid, 10).subscribe(data => {
+ this.errorDetails = data['root-exception'];
+ this.cdr.markForCheck();
+ });
+ this.isError = true;
+ this.isLoading = false;
+ this.cdr.markForCheck();
+ return EMPTY;
+ })
+ )
+ )
)
- .subscribe(
- () => {
- this.isLoading = false;
- this.cdr.markForCheck();
- },
- () => {
- this.isLoading = false;
- this.cdr.markForCheck();
- }
- );
+ .subscribe(() => {
+ this.isLoading = false;
+ this.isError = false;
+ this.cdr.markForCheck();
+ });
}
ngOnDestroy() {
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index e1d057e..ef013f2 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -117,8 +117,7 @@ export class JobService {
map(job => this.convertJob(job)),
tap(job => {
this.jobDetail$.next(job);
- }),
- catchError(() => EMPTY)
+ })
);
}
diff --git a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
index 9553bda..51f84dd 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
@@ -33,7 +33,7 @@
</tr>
</thead>
<tbody>
- <tr *ngFor="let job of listOfJob; trackBy:trackJobBy;" (click)="navigateToJob(job.jid)" class="clickable">
+ <tr *ngFor="let job of listOfJob; trackBy:trackJobBy;" (click)="navigateToJob(job)" class="clickable">
<td>{{ job.name }}</td>
<td>{{ job["start-time"] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td>{{ job.duration | humanizeDuration }}</td>
diff --git a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
index 01aceef..ec43952 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
@@ -23,6 +23,7 @@ import { Observable, Subject } from 'rxjs';
import { flatMap, takeUntil } from 'rxjs/operators';
import { JobService, StatusService } from 'services';
import { deepFind, isNil } from 'utils';
+import { NzMessageService } from 'ng-zorro-antd';
@Component({
selector: 'flink-job-list',
@@ -64,13 +65,18 @@ export class JobListComponent implements OnInit, OnDestroy {
return node.jid;
}
- navigateToJob(jid: string) {
- this.router.navigate(['job', jid]).then();
+ navigateToJob(job: JobsItemInterface) {
+ if (job.state === 'INITIALIZING') {
+ this.nzMessageService.info('Job detail page is not available while it is in state INITIALIZING.');
+ } else {
+ this.router.navigate(['job', job.jid]).then();
+ }
}
constructor(
private statusService: StatusService,
private jobService: JobService,
+ private nzMessageService: NzMessageService,
private activatedRoute: ActivatedRoute,
private cdr: ChangeDetectorRef,
private router: Router
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
similarity index 65%
copy from flink-runtime-web/web-dashboard/src/app/app.config.ts
copy to flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
index 6d41b67..05752f1 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
@@ -16,20 +16,18 @@
* limitations under the License.
*/
-export const BASE_URL = '.';
-export const COLOR_MAP = {
- TOTAL: '#112641',
- RUNNING: '#52c41a',
- FAILED: '#f5222d',
- FINISHED: '#1890ff',
- CANCELED: '#fa8c16',
- CANCELING: '#faad14',
- CREATED: '#2f54eb',
- DEPLOYING: '#13c2c2',
- RECONCILING: '#eb2f96',
- IN_PROGRESS: '#faad14',
- SCHEDULED: '#722ed1',
- COMPLETED: '#1890ff',
- RESTARTING: '#13c2c2'
-};
-export const LONG_MIN_VALUE = -9223372036854776000;
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * An exception indicating that the job has failed in the INITIALIZING job status.
+ */
+public class JobInitializationException extends JobExecutionException {
+
+ private static final long serialVersionUID = 2818087325120827526L;
+
+ public JobInitializationException(JobID jobID, String msg, Throwable cause){
+ super(jobID, msg, cause);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 047b80e..e21d742 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,7 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
@@ -72,9 +71,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -90,7 +88,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -119,7 +116,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private final FatalErrorHandler fatalErrorHandler;
- private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
+ private final Map<JobID, DispatcherJob> runningJobs;
private final DispatcherBootstrap dispatcherBootstrap;
@@ -134,10 +131,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Nullable
private final String metricServiceQueryAddress;
- private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
+ private final Map<JobID, CompletableFuture<Void>> dispatcherJobTerminationFutures;
protected final CompletableFuture<ApplicationStatus> shutDownFuture;
+ /**
+ * Enum to distinguish between initial job submission and re-submission for recovery.
+ */
+ protected enum ExecutionType {
+ SUBMISSION, RECOVERY
+ }
+
public Dispatcher(
RpcService rpcService,
DispatcherId fencingToken,
@@ -163,7 +167,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
- jobManagerRunnerFutures = new HashMap<>(16);
+ runningJobs = new HashMap<>(16);
this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
@@ -171,7 +175,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
- this.jobManagerTerminationFutures = new HashMap<>(2);
+ this.dispatcherJobTerminationFutures = new HashMap<>(2);
this.shutDownFuture = new CompletableFuture<>();
@@ -213,18 +217,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
- FutureUtils.assertNoException(runJob(recoveredJob)
- .handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
- }
-
- private BiFunction<Void, Throwable, Void> handleRecoveredJobStartError(JobID jobId) {
- return (ignored, throwable) -> {
- if (throwable != null) {
- onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", jobId), throwable));
- }
-
- return null;
- };
+ try {
+ runJob(recoveredJob, ExecutionType.RECOVERY);
+ } catch (Throwable throwable) {
+ onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
+ }
}
private void handleStartDispatcherServicesException(Exception e) throws Exception {
@@ -241,10 +238,10 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
public CompletableFuture<Void> onStop() {
log.info("Stopping dispatcher {}.", getAddress());
- final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
+ final CompletableFuture<Void> allJobsTerminationFuture = terminateRunningJobsAndGetTerminationFuture();
return FutureUtils.runAfterwards(
- allJobManagerRunnersTerminationFuture,
+ allJobsTerminationFuture,
() -> {
dispatcherBootstrap.stop();
@@ -307,7 +304,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e);
}
- return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId);
+ return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || runningJobs.containsKey(jobId);
}
private boolean isPartialResourceConfigured(JobGraph jobGraph) {
@@ -332,7 +329,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
- final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
+ final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
+ this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
@@ -350,44 +348,59 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}, getRpcService().getExecutor());
}
- private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
+ private void persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
-
- final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
-
- return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
- if (throwable != null) {
- jobGraphWriter.removeJobGraph(jobGraph.getJobID());
- }
- }));
+ runJob(jobGraph, ExecutionType.SUBMISSION);
}
- private CompletableFuture<Void> runJob(JobGraph jobGraph) {
- Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
+ private void runJob(JobGraph jobGraph, ExecutionType executionType) {
+ Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
- final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
- jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
+ DispatcherJob dispatcherJob = DispatcherJob.createFor(
+ jobManagerRunnerFuture,
+ jobGraph.getJobID(),
+ jobGraph.getName());
+ runningJobs.put(jobGraph.getJobID(), dispatcherJob);
- return jobManagerRunnerFuture
- .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
- .thenApply(FunctionUtils.nullFn())
- .whenCompleteAsync(
- (ignored, throwable) -> {
- if (throwable != null) {
- jobManagerRunnerFutures.remove(jobGraph.getJobID());
+ final JobID jobId = jobGraph.getJobID();
+ FutureUtils.assertNoException(
+ dispatcherJob.getResultFuture().handleAsync(
+ (DispatcherJobResult dispatcherJobResult, Throwable throwable) -> {
+ // check if we are still the active DispatcherJob by checking the identity
+ DispatcherJob job = runningJobs.get(jobId);
+ if (job == dispatcherJob) {
+ if (dispatcherJobResult != null) {
+ if (dispatcherJobResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) {
+ dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
+ } else {
+ jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
+ }
+ } else {
+ dispatcherJobFailed(jobId, throwable);
+ }
+ } else {
+ log.debug("Job {} is not registered anymore at dispatcher", jobId);
}
- },
- getMainThreadExecutor());
+ return null;
+ }, getMainThreadExecutor()));
}
- private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
- final RpcService rpcService = getRpcService();
+ private void dispatcherJobFailed(JobID jobId, Throwable throwable) {
+ if (throwable instanceof JobNotFinishedException) {
+ jobNotFinished(jobId);
+ } else {
+ jobMasterFailed(jobId, throwable);
+ }
+ }
+ CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+ final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
- return jobManagerRunnerFactory.createJobManagerRunner(
+ JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
@@ -396,52 +409,19 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler);
+ runner.start();
+ return runner;
} catch (Exception e) {
- throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
+ throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
}
},
- rpcService.getExecutor());
- }
-
- private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
- final JobID jobId = jobManagerRunner.getJobID();
-
- FutureUtils.assertNoException(
- jobManagerRunner.getResultFuture().handleAsync(
- (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
- // check if we are still the active JobManagerRunner by checking the identity
- final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
- .map(future -> future.getNow(null))
- .orElse(null);
- //noinspection ObjectEquality
- if (jobManagerRunner == currentJobManagerRunner) {
- if (archivedExecutionGraph != null) {
- jobReachedGloballyTerminalState(archivedExecutionGraph);
- } else {
- final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
- if (strippedThrowable instanceof JobNotFinishedException) {
- jobNotFinished(jobId);
- } else {
- jobMasterFailed(jobId, strippedThrowable);
- }
- }
- } else {
- log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
- }
-
- return null;
- }, getMainThreadExecutor()));
-
- jobManagerRunner.start();
-
- return jobManagerRunner;
+ rpcService.getExecutor()); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
}
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
- Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
+ Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet())));
}
@Override
@@ -465,17 +445,18 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
+ Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+ return maybeJob.map(job -> job.cancel(timeout)).orElseGet(() -> {
+ log.debug("Dispatcher is unable to cancel job {}: not found", jobId);
+ return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+ });
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
- final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
- (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
+ final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(dispatcherJob -> dispatcherJob.requestJobStatus(timeout));
CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
@@ -493,8 +474,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Override
public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
- List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails = queryJobMastersForInformation(
- (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobDetails(timeout));
+ List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails = queryJobMastersForInformation(dj -> dj.requestJobDetails(timeout));
CompletableFuture<Collection<Optional<JobDetails>>> optionalCombinedJobDetails = FutureUtils.combineAll(
individualOptionalJobDetails);
@@ -516,59 +496,47 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
-
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
-
- return jobStatusFuture.exceptionally(
- (Throwable throwable) -> {
- final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
- // check whether it is a completed job
- if (jobDetails == null) {
- throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
- } else {
- return jobDetails.getStatus();
- }
- });
+ Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+ return maybeJob.map(job -> job.requestJobStatus(timeout)).orElseGet(() -> {
+ // is it a completed job?
+ final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+ if (jobDetails == null) {
+ return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+ } else {
+ return CompletableFuture.completedFuture(jobDetails.getStatus());
+ }
+ });
}
@Override
public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
final JobID jobId,
final JobVertexID jobVertexId) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
+ return performOperationOnJobMasterGateway(jobId, gateway -> gateway.requestOperatorBackPressureStats(jobVertexId));
}
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));
-
- return archivedExecutionGraphFuture.exceptionally(
- (Throwable throwable) -> {
- final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
-
- // check whether it is a completed job
- if (serializableExecutionGraph == null) {
- throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
- } else {
- return serializableExecutionGraph;
- }
- });
+ Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException = throwable -> {
+ // check whether it is a completed job
+ final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
+ if (archivedExecutionGraph == null) {
+ throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
+ } else {
+ return archivedExecutionGraph;
+ }
+ };
+ Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+ return maybeJob.map(job -> job.requestJob(timeout))
+ .orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)))
+ .exceptionally(checkExecutionGraphStoreOnException);
}
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
- final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+ DispatcherJob job = runningJobs.get(jobId);
- if (jobManagerRunnerFuture == null) {
+ if (job == null) {
final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
if (archivedExecutionGraph == null) {
@@ -577,7 +545,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
}
} else {
- return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
+ return job.getResultFuture().thenApply(dispatcherJobResult -> JobResult.createFrom(dispatcherJobResult.getArchivedExecutionGraph()));
}
}
@@ -606,11 +574,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
- return jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) ->
- jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
+ return performOperationOnJobMasterGateway(jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
}
@Override
@@ -619,11 +584,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
final String targetDirectory,
final boolean advanceToEndOfEventTime,
final Time timeout) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- return jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) ->
- jobMasterGateway.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime, timeout));
+ return performOperationOnJobMasterGateway(jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime, timeout));
}
@Override
@@ -643,11 +604,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
OperatorID operatorId,
SerializedValue<CoordinationRequest> serializedRequest,
Time timeout) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- return jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) ->
- jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
+ return performOperationOnJobMasterGateway(jobId, gateway -> gateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
}
/**
@@ -660,38 +617,36 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
- registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+ registerDispatcherJobTerminationFuture(jobId, cleanupFuture);
}
- private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
- Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
-
- jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
+ private void registerDispatcherJobTerminationFuture(JobID jobId, CompletableFuture<Void> dispatcherJobTerminationFuture) {
+ Preconditions.checkState(!dispatcherJobTerminationFutures.containsKey(jobId));
+ dispatcherJobTerminationFutures.put(jobId, dispatcherJobTerminationFuture);
// clean up the pending termination future
- jobManagerRunnerTerminationFuture.thenRunAsync(
+ dispatcherJobTerminationFuture.thenRunAsync(
() -> {
- final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
+ final CompletableFuture<Void> terminationFuture = dispatcherJobTerminationFutures.remove(jobId);
//noinspection ObjectEquality
- if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
- jobManagerTerminationFutures.put(jobId, terminationFuture);
+ if (terminationFuture != null && terminationFuture != dispatcherJobTerminationFuture) {
+ dispatcherJobTerminationFutures.put(jobId, terminationFuture);
}
},
getMainThreadExecutor());
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
+ DispatcherJob job = runningJobs.remove(jobId);
- final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
- if (jobManagerRunnerFuture != null) {
- jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
+ final CompletableFuture<Void> dispatcherJobTerminationFuture;
+ if (job != null) {
+ dispatcherJobTerminationFuture = job.closeAsync();
} else {
- jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
+ dispatcherJobTerminationFuture = CompletableFuture.completedFuture(null);
}
-
- return jobManagerRunnerTerminationFuture.thenRunAsync(
+ return dispatcherJobTerminationFuture.thenRunAsync(
() -> cleanUpJobData(jobId, cleanupHA),
getRpcService().getExecutor());
}
@@ -727,21 +682,21 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
/**
- * Terminate all currently running {@link JobManagerRunnerImpl}.
+ * Terminate all currently running {@link DispatcherJob}s.
*/
- private void terminateJobManagerRunners() {
+ private void terminateRunningJobs() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
- final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
+ final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
}
}
- private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
- terminateJobManagerRunners();
- final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
+ private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
+ terminateRunningJobs();
+ final Collection<CompletableFuture<Void>> values = dispatcherJobTerminationFutures.values();
return FutureUtils.completeAll(values);
}
@@ -802,30 +757,33 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
}
- private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
- final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
-
- if (jobManagerRunnerFuture == null) {
+ /**
+ * Ensures that the JobMasterGateway is available.
+ */
+ private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
+ DispatcherJob job = runningJobs.get(jobId);
+ if (job == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
- } else {
- final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
- return leaderGatewayFuture.thenApplyAsync(
- (JobMasterGateway jobMasterGateway) -> {
- // check whether the retrieved JobMasterGateway belongs still to a running JobMaster
- if (jobManagerRunnerFutures.containsKey(jobId)) {
- return jobMasterGateway;
- } else {
- throw new CompletionException(new FlinkJobNotFoundException(jobId));
- }
- },
- getMainThreadExecutor());
}
+ if (!job.isInitialized()) {
+ return FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. "
+ + "The requested operation is not available while the JobManager is initializing."));
+ }
+ return job.getJobMasterGateway();
+ }
+
+ private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobId, Function<JobMasterGateway, CompletableFuture<T>> operation) {
+ return getJobMasterGateway(jobId).thenCompose(operation);
}
private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
return resourceManagerGatewayRetriever.getFuture();
}
+ private Optional<DispatcherJob> getDispatcherJob(JobID jobId) {
+ return Optional.ofNullable(runningJobs.get(jobId));
+ }
+
private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
return getResourceManagerGateway().thenApply(resourceManagerCommand).thenCompose(Function.identity());
}
@@ -835,51 +793,47 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
@Nonnull
- private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
- final int numberJobsRunning = jobManagerRunnerFutures.size();
-
- ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
- numberJobsRunning);
+ private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>> queryFunction) {
- for (JobID jobId : jobManagerRunnerFutures.keySet()) {
- final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+ List<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
+ runningJobs.size());
- final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
- .thenCompose(queryFunction::apply)
- .handle((T value, Throwable throwable) -> Optional.ofNullable(value));
-
- optionalJobInformation.add(optionalRequest);
+ for (DispatcherJob job : runningJobs.values()) {
+ final CompletableFuture<Optional<T>> queryResult = queryFunction.apply(job)
+ .handle((T value, Throwable t) -> Optional.ofNullable(value));
+ optionalJobInformation.add(queryResult);
}
return optionalJobInformation;
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJob(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
- throwable)); });
+ throwable));
+ });
- return jobManagerTerminationFuture.thenComposeAsync(
- FunctionUtils.uncheckedFunction((ignored) -> {
- jobManagerTerminationFutures.remove(jobId);
- return action.apply(jobGraph);
+ return jobManagerTerminationFuture.thenAcceptAsync(
+ FunctionUtils.uncheckedConsumer((ignored) -> {
+ dispatcherJobTerminationFutures.remove(jobId);
+ action.accept(jobGraph);
}),
getMainThreadExecutor());
}
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
- if (jobManagerRunnerFutures.containsKey(jobId)) {
+ if (runningJobs.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
} else {
- return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
+ return dispatcherJobTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
}
}
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS,
- () -> (long) jobManagerRunnerFutures.size());
+ () -> (long) runningJobs.size());
}
public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
new file mode 100644
index 0000000..58f826b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+ private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+ FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ if (throwable == null) { // initialization succeeded
+ // Forward result future
+ jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, resultThrowable) -> {
+ if (archivedExecutionGraph != null) {
+ jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
+ } else {
+ jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
+ }
+ });
+ } else { // failure during initialization
+ final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+ ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ strippedThrowable,
+ initializationTimestamp);
+ jobResultFuture.complete(DispatcherJobResult.forInitializationFailure(archivedExecutionGraph, strippedThrowable));
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<DispatcherJobResult> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not completed.
+ * The returned future will complete exceptionally if the JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+
+ // cancel job
+ CompletableFuture<Acknowledge> cancelFuture = jobManagerRunnerFuture
+ .thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+ cancelFuture.whenComplete((ignored, cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+ return cancelFuture;
+ }
+ }
+ }
+
+ public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+ return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+ }
+
+ /**
+ * Returns a future completing to the ArchivedExecutionGraph of the job.
+ */
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ if (jobResultFuture.isDone()) { // job is not running anymore
+ return jobResultFuture.thenApply(DispatcherJobResult::getArchivedExecutionGraph);
+ }
+ // job is still running
+ return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(
+ timeout));
+ } else {
+ Preconditions.checkState(this.jobStatus == DispatcherJobStatus.INITIALIZING || jobStatus == DispatcherJobStatus.CANCELLING);
+ return CompletableFuture.completedFuture(
+ ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ jobStatus.asJobStatus(),
+ null,
+ initializationTimestamp));
+ }
+ }
+ }
+
+ /**
+ * The job is initialized once the JobManager runner has been initialized.
+ * It is also initialized if the runner initialization failed, or of it has been
+ * canceled (and the cancellation is complete).
+ */
+ public boolean isInitialized() {
+ synchronized (lock) {
+ return jobStatus == DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }
+
+ /**
+ * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+ *
+ * @return the {@link JobMasterGateway}. The future will complete exceptionally if the JobManagerRunner initialization failed.
+ * @throws IllegalStateException is thrown if the job is not initialized
+ */
+ public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+ Preconditions.checkState(
+ isInitialized(),
+ "JobMaster Gateway is not available during initialization");
+ return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ FutureUtils.assertNoException(jobManagerRunnerFuture.handle((runner, throwable) -> {
+ if (throwable == null) {
+ // init was successful: close jobManager runner.
+ CompletableFuture<Void> jobManagerRunnerClose = jobManagerRunnerFuture.thenCompose(
+ AutoCloseableAsync::closeAsync);
+ FutureUtils.forward(jobManagerRunnerClose, terminationFuture);
+ } else {
+ // initialization has failed: complete termination.
+ terminationFuture.complete(null);
+ }
+ return null;
+ }));
+ return terminationFuture;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
new file mode 100644
index 0000000..ade3459
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization has failed.
+ * For initialization failures, the throwable is also attached, to avoid deserializing it from the ArchivedExecutionGraph.
+ */
+final class DispatcherJobResult {
+
+ private final ArchivedExecutionGraph archivedExecutionGraph;
+
+ // if the throwable field is set, the job failed during initialization.
+ @Nullable
+ private final Throwable initializationFailure;
+
+ private DispatcherJobResult(ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
+ this.archivedExecutionGraph = archivedExecutionGraph;
+ this.initializationFailure = throwable;
+ }
+
+ public boolean isInitializationFailure() {
+ return initializationFailure != null;
+ }
+
+ public ArchivedExecutionGraph getArchivedExecutionGraph() {
+ return archivedExecutionGraph;
+ }
+
+ /**
+ * @throws IllegalStateException if this DispatcherJobResult is a successful initialization.
+ */
+ public Throwable getInitializationFailure() {
+ Preconditions.checkState(isInitializationFailure(), "This DispatcherJobResult does not represent a failed initialization.");
+ return initializationFailure;
+ }
+
+ public static DispatcherJobResult forInitializationFailure(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
+ return new DispatcherJobResult(archivedExecutionGraph, throwable);
+ }
+
+ public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
+ return new DispatcherJobResult(archivedExecutionGraph, null);
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
similarity index 66%
copy from flink-runtime-web/web-dashboard/src/app/app.config.ts
copy to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
index 6d41b67..6ce4251 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
@@ -16,20 +16,15 @@
* limitations under the License.
*/
-export const BASE_URL = '.';
-export const COLOR_MAP = {
- TOTAL: '#112641',
- RUNNING: '#52c41a',
- FAILED: '#f5222d',
- FINISHED: '#1890ff',
- CANCELED: '#fa8c16',
- CANCELING: '#faad14',
- CREATED: '#2f54eb',
- DEPLOYING: '#13c2c2',
- RECONCILING: '#eb2f96',
- IN_PROGRESS: '#faad14',
- SCHEDULED: '#722ed1',
- COMPLETED: '#1890ff',
- RESTARTING: '#13c2c2'
-};
-export const LONG_MIN_VALUE = -9223372036854776000;
+package org.apache.flink.runtime.dispatcher;
+
+/**
+ * Exception indicating that a Dispatcher operation is temporarily unavailable.
+ */
+public class UnavailableDispatcherOperationException extends DispatcherException {
+ private static final long serialVersionUID = -45499335133622792L;
+
+ public UnavailableDispatcherOperationException(String message) {
+ super(message);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 8461138..c011739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -351,4 +352,52 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getStateBackendName().orElse(null));
}
+
+ /**
+ * Create a sparse ArchivedExecutionGraph for a job while it is still initializing.
+ * Most fields will be empty, only job status and error-related fields are set.
+ */
+ public static ArchivedExecutionGraph createFromInitializingJob(
+ JobID jobId,
+ String jobName,
+ JobStatus jobStatus,
+ @Nullable Throwable throwable,
+ long initializationTimestamp) {
+ Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = Collections.emptyMap();
+ List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = Collections.emptyList();
+ final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators = Collections.emptyMap();
+ StringifiedAccumulatorResult[] archivedUserAccumulators = new StringifiedAccumulatorResult[]{};
+
+ final long[] timestamps = new long[JobStatus.values().length];
+ timestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+
+ String jsonPlan = "{}";
+
+ ErrorInfo failureInfo = null;
+ if (throwable != null) {
+ Preconditions.checkState(jobStatus == JobStatus.FAILED);
+ long failureTime = System.currentTimeMillis();
+ failureInfo = new ErrorInfo(throwable, failureTime);
+ timestamps[JobStatus.FAILED.ordinal()] = failureTime;
+ }
+
+ return new ArchivedExecutionGraph(
+ jobId,
+ jobName,
+ archivedTasks,
+ archivedVerticesInCreationOrder,
+ timestamps,
+ jobStatus,
+ failureInfo,
+ jsonPlan,
+ archivedUserAccumulators,
+ serializedUserAccumulators,
+ new ExecutionConfig().archive(),
+ false,
+ null,
+ null,
+ null);
+
+ }
+
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
new file mode 100644
index 0000000..9c0784f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ assertThat(dispatcherJob.isInitialized(), is(false));
+ assertThat(dispatcherJob.getResultFuture().isDone(), is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ assertThat(dispatcherJob.getResultFuture().isDone(), is(false));
+
+ assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ DispatcherJobResult result = dispatcherJob.getResultFuture().get();
+
+ assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture = dispatcherJob.cancel(
+ TIMEOUT);
+
+ assertThat(cancelFuture.isDone(), is(false));
+ assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+ assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(), is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture = dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(), is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure", ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg = dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+ assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()), is(exception));
+ }
+
+ @Test
+ public void testDispatcherJobResult() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ testContext.failInitialization(new RuntimeException("Artificial failure in runner initialization"));
+
+ DispatcherJobResult result = dispatcherJob.getResultFuture().get();
+ assertThat(result.isInitializationFailure(), is(true));
+ assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FAILED));
+ assertThat(result.getInitializationFailure().getMessage(), containsString("Artificial failure"));
+ }
+
+ @Test
+ public void testCloseWhileInitializingSuccessfully() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+ assertThat(closeFuture.isDone(), is(false));
+
+ // set job running, so that we can cancel it
+ testContext.setRunning();
+
+ // assert future completes now
+ closeFuture.get();
+
+ // ensure the result future is complete (how it completes is up to the JobManager)
+ CompletableFuture<DispatcherJobResult> resultFuture = dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished", ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneously() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+ assertThat(closeFuture.isDone(), is(false));
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ // assert future completes now
+ closeFuture.get();
+
+ // ensure the result future is complete
+ dispatcherJob.getResultFuture().get();
+ }
+
+ @Test
+ public void testCloseWhileRunning() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // complete JobManager runner future to indicate to the DispatcherJob that the Runner has been initialized
+ testContext.setRunning();
+
+ CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+
+ closeFuture.get();
+
+ // result future should complete exceptionally.
+ CompletableFuture<DispatcherJobResult> resultFuture = dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished", ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUnavailableJobMasterGateway() {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ dispatcherJob.getJobMasterGateway();
+ }
+
+ private TestContext createTestContext() {
+ final JobVertex testVertex = new JobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
+ CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+ DispatcherJob dispatcherJob = DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+ jobGraph.getJobID(), jobGraph.getName());
+
+ return new TestContext(
+ jobManagerRunnerCompletableFuture,
+ dispatcherJob,
+ jobGraph);
+ }
+
+ private static class TestContext {
+ private final CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture;
+ private final DispatcherJob dispatcherJob;
+ private final JobGraph jobGraph;
+ private final TestingJobMasterGateway mockRunningJobMasterGateway;
+ private final CompletableFuture<Acknowledge> cancellationFuture;
+
+ private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+ private CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+
+ public TestContext (
+ CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture,
+ DispatcherJob dispatcherJob,
+ JobGraph jobGraph) {
+ this.jobManagerRunnerCompletableFuture = jobManagerRunnerCompletableFuture;
+ this.dispatcherJob = dispatcherJob;
+ this.jobGraph = jobGraph;
+
+ this.cancellationFuture = new CompletableFuture<>();
+ this.mockRunningJobMasterGateway = new TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(() -> CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", internalJobStatus, null, 1337)))
+ .setRequestJobDetailsSupplier(() -> {
+ JobDetails jobDetails = new JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+ new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0}, 0);
+ return CompletableFuture.completedFuture(jobDetails);
+ })
+ // once JobManagerRunner is initialized, complete result future with CANCELLED AEG and ack cancellation.
+ .setCancelFunction(() -> {
+ internalJobStatus = JobStatus.CANCELLING;
+ return cancellationFuture;
+ })
+ .build();
+ }
+
+ public JobID getJobID() {
+ return jobGraph.getJobID();
+ }
+
+ public void failInitialization(Throwable ex) {
+ jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+ }
+
+ public DispatcherJob getDispatcherJob() {
+ return dispatcherJob;
+ }
+
+ public void setRunning() {
+ internalJobStatus = JobStatus.RUNNING;
+ JobManagerRunner jobManagerRunner = new TestingJobManagerRunner.Builder()
+ .setJobId(getJobID())
+ .setBlockingTermination(false)
+ .setJobMasterGatewayFuture(CompletableFuture.completedFuture(mockRunningJobMasterGateway))
+ .setResultFuture(resultFuture)
+ .build();
+ jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+ }
+
+ public void finishJob() {
+ internalJobStatus = JobStatus.FINISHED;
+ resultFuture.complete(
+ ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.FINISHED, null, 1337));
+ }
+
+ public void finishCancellation() {
+ jobManagerRunnerCompletableFuture.thenAccept(runner -> {
+ internalJobStatus = JobStatus.CANCELED;
+ runner.getResultFuture()
+ .complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.CANCELED, null, 1337));
+ cancellationFuture.complete(Acknowledge.get());
+ });
+ }
+ }
+
+ private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus) throws Exception {
+ assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus));
+ assertThat(dispatcherJob.requestJob(TIMEOUT).get().getState(), is(expectedStatus));
+ assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d2ad995..1caec0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -75,6 +76,7 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -280,15 +282,16 @@ public class DispatcherResourceCleanupTest extends TestLogger {
@Test
public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
- final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+ dispatcherGateway.submitJob(jobGraph, timeout).get();
- try {
- submissionFuture.get();
- fail("Job submission was expected to fail.");
- } catch (ExecutionException ee) {
- assertThat(ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), is(true));
- }
+ Optional<SerializedThrowable> maybeError = dispatcherGateway.requestJobResult(
+ jobId,
+ timeout).get().getSerializedThrowable();
+
+ assertThat(maybeError.isPresent(), is(true));
+ Throwable exception = maybeError.get().deserializeError(this.getClass().getClassLoader());
+ assertThat(ExceptionUtils.findThrowable(exception, JobExecutionException.class).isPresent(), is(true));
assertThatHABlobsHaveBeenRemoved();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 854cfcd..51e43f7 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -30,7 +32,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -45,10 +49,13 @@ import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
@@ -62,6 +69,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -73,6 +81,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -89,6 +98,8 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
@@ -103,7 +114,6 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -201,7 +211,10 @@ public class DispatcherTest extends TestLogger {
return dispatcher;
}
- private class TestingDispatcherBuilder {
+ /**
+ * Builder for the TestingDispatcher.
+ */
+ public class TestingDispatcherBuilder {
private DispatcherBootstrap dispatcherBootstrap = new DefaultDispatcherBootstrap(Collections.emptyList());
@@ -285,12 +298,11 @@ public class DispatcherTest extends TestLogger {
@Test
public void testJobSubmission() throws Exception {
dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
-
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- acknowledgeFuture.get();
+ jobMasterLeaderElectionService.getStartFuture().get();
assertTrue(
"jobManagerRunner was not started",
@@ -329,6 +341,111 @@ public class DispatcherTest extends TestLogger {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+ JobID jobID = blockingJobGraph.f0.getJobID();
+
+ dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+ // ensure INITIALIZING status
+ assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails = dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ assertEquals(1, multiDetails.getJobs().size());
+ assertEquals(jobID, multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobGraph.f1.unblock();
+
+ // ensure job is running
+ CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
+ }
+
+ @Test(timeout = 5_000L)
+ public void testInvalidCallDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+ JobID jid = blockingJobGraph.f0.getJobID();
+
+ dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+ assertThat(dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+ // this call is supposed to fail
+ try {
+ dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint", false, TIMEOUT).get();
+ fail("Previous statement should have failed");
+ } catch (ExecutionException t) {
+ assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException);
+ }
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobGraph.f1.unblock();
+
+ // ensure job is running
+ CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
+ }
+
+ @Test
+ public void testCancellationDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+ JobID jobID = blockingJobGraph.f0.getJobID();
+
+ dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+ assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+ // submission has succeeded, now cancel the job
+ CompletableFuture<Acknowledge> cancellationFuture = dispatcherGateway.cancelJob(jobID, TIMEOUT);
+ assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.CANCELLING));
+ assertThat(cancellationFuture.isDone(), is(false));
+ // unblock
+ blockingJobGraph.f1.unblock();
+ // wait until cancelled
+ cancellationFuture.get();
+ assertThat(dispatcherGateway.requestJobResult(jobID, TIMEOUT).get().getApplicationStatus(), is(ApplicationStatus.CANCELED));
+ }
+
+ @Test
+ public void testErrorDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph that fails during initialization
+ final FailingInitializationJobVertex failingInitializationJobVertex = new FailingInitializationJobVertex("testVertex");
+ failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, "failingTestJob", failingInitializationJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // wait till job has failed
+ dispatcherGateway.requestJobResult(TEST_JOB_ID, TIMEOUT).get();
+
+ // get failure cause
+ ArchivedExecutionGraph execGraph = dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get();
+ Assert.assertNotNull(execGraph.getFailureInfo());
+ Throwable throwable = execGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader());
+
+ // ensure correct exception type
+ assertTrue(throwable instanceof JobInitializationException);
+ }
+
/**
* Test that {@link JobResult} is cached when the job finishes.
*/
@@ -413,26 +530,42 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testWaitingForJobMasterLeadership() throws Exception {
- dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+ ExpectedJobIdJobManagerRunnerFactory jobManagerRunnerFactor = new ExpectedJobIdJobManagerRunnerFactory(
+ TEST_JOB_ID,
+ createdJobManagerRunnerLatch);
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, jobManagerRunnerFactor);
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-
- final CompletableFuture<JobStatus> jobStatusFuture = dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
-
- assertThat(jobStatusFuture.isDone(), is(false));
-
- try {
- jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
- fail("Should not complete.");
- } catch (TimeoutException ignored) {
- // ignored
+ log.info("Job submission completed");
+
+ // wait until job has been initialized: approximated by the time when the leaderelection finished
+ jobMasterLeaderElectionService.getStartFuture().get();
+
+ // try getting a blocking, non-initializing job status future in a retry-loop.
+ // In some CI environments, we can not guarantee that the job immediately leaves the INITIALIZING status
+ // after the jobMasterLeaderElectionService has been started.
+ CompletableFuture<JobStatus> jobStatusFuture = null;
+ for (int i = 0; i < 5; i++) {
+ jobStatusFuture = dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+ try {
+ JobStatus status = jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
+ if (status == JobStatus.INITIALIZING) {
+ jobStatusFuture = null;
+ Thread.sleep(100);
+ }
+ } catch (TimeoutException ignored) {
+ break; // great, we have a blocking future
+ }
+ }
+ if (jobStatusFuture == null) {
+ fail("Unable to get a job status future blocked on leader election.");
}
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
- assertThat(jobStatusFuture.get(), notNullValue());
+ assertThat(jobStatusFuture.get(), is(JobStatus.RUNNING));
}
/**
@@ -476,19 +609,17 @@ public class DispatcherTest extends TestLogger {
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(submissionFuture.isDone(), is(false));
+ assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
final CompletableFuture<Collection<String>> metricQueryServiceAddressesFuture = dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds(5L));
assertThat(metricQueryServiceAddressesFuture.get(), is(empty()));
- assertThat(submissionFuture.isDone(), is(false));
+ assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
jobManagerRunnerCreationLatch.trigger();
-
- submissionFuture.get();
}
/**
@@ -511,26 +642,31 @@ public class DispatcherTest extends TestLogger {
}
}));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(submissionFuture.isDone(), is(false));
+ assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
queue.offer(Optional.of(testException));
- try {
- submissionFuture.get();
- fail("Should fail because we could not instantiate the JobManagerRunner.");
- } catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
- }
+ // wait till job is failed
+ dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get();
- submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ ArchivedExecutionGraph execGraph = dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get();
+ Assert.assertNotNull(execGraph.getFailureInfo());
+ assertThat(ExceptionUtils.findSerializedThrowable(execGraph.getFailureInfo().getException(), FlinkException.class, this.getClass().getClassLoader()).isPresent(), is(true));
+ // submit job again
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ // don't fail this time
queue.offer(Optional.empty());
- submissionFuture.get();
+ // Ensure job is running
+ CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
}
@Test
@@ -546,8 +682,8 @@ public class DispatcherTest extends TestLogger {
dispatcher.start();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
- final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
- submissionFuture.get();
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
dispatcher.close();
@@ -626,10 +762,36 @@ public class DispatcherTest extends TestLogger {
public TestingJobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
jobManagerRunnerCreationLatch.run();
- return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ TestingJobManagerRunner testingRunner = super.createJobManagerRunner(
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ jobManagerSharedServices,
+ jobManagerJobMetricGroupFactory,
+ fatalErrorHandler);
+
+ TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(() -> CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(jobGraph.getJobID(),
+ jobGraph.getName(),
+ JobStatus.RUNNING,
+ null,
+ 1337))
+ ).build();
+ testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+ return testingRunner;
}
}
+ private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
+ final BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ return Tuple2.of(
+ new JobGraph(TEST_JOB_ID, "blockingTestJob", blockingJobVertex),
+ blockingJobVertex);
+ }
+
private JobGraph createFailingJobGraph(Exception failureCause) {
final FailingJobVertex jobVertex = new FailingJobVertex("Failing JobVertex", failureCause);
jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -690,4 +852,34 @@ public class DispatcherTest extends TestLogger {
}
}
+ private static class BlockingJobVertex extends JobVertex {
+ private final OneShotLatch oneShotLatch = new OneShotLatch();
+ public BlockingJobVertex(String name) {
+ super(name);
+ }
+
+ @Override
+ public void initializeOnMaster(ClassLoader loader) throws Exception {
+ super.initializeOnMaster(loader);
+ oneShotLatch.await();
+ }
+
+ public void unblock() {
+ oneShotLatch.trigger();
+ }
+ }
+
+ /**
+ * JobVertex that fails during initialization.
+ */
+ public static class FailingInitializationJobVertex extends JobVertex {
+ public FailingInitializationJobVertex(String name) {
+ super(name);
+ }
+
+ @Override
+ public void initializeOnMaster(ClassLoader loader) {
+ throw new IllegalStateException("Artificial test failure");
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 1f3216b..a5307e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -77,7 +77,10 @@ public class TestingJobManagerRunnerFactory implements JobManagerRunnerFactory {
blockingTermination = false;
}
- return new TestingJobManagerRunner(jobGraph.getJobID(), blockingTermination);
+ return new TestingJobManagerRunner.Builder()
+ .setJobId(jobGraph.getJobID())
+ .setBlockingTermination(blockingTermination)
+ .build();
}
public TestingJobManagerRunner takeCreatedJobManagerRunner() throws InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
index d62930b..1eec46d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.BlobServerOptions;
@@ -28,7 +29,6 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -53,17 +54,19 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.FilenameFilter;
import java.net.InetSocketAddress;
+import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -180,18 +183,17 @@ public class BlobsCleanupITCase extends TestLogger {
jobGraph.addUserJarBlobKey(new PermanentBlobKey());
}
- final CompletableFuture<JobSubmissionResult> submissionFuture = miniCluster.submitJob(jobGraph);
+ final JobSubmissionResult jobSubmissionResult = miniCluster.submitJob(jobGraph).get();
if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
- try {
- submissionFuture.get();
- fail("Expected job submission failure.");
- } catch (ExecutionException e) {
- assertThat(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent(), is(true));
- }
- } else {
- final JobSubmissionResult jobSubmissionResult = submissionFuture.get();
+ // Wait for submission to fail & check if exception is forwarded
+ Optional<SerializedThrowable> exception = miniCluster.requestJobResult(jid).get().getSerializedThrowable();
+ assertTrue(exception.isPresent());
+ assertTrue(ExceptionUtils.findThrowableSerializedAware(exception.get(), NoSuchFileException.class, getClass().getClassLoader()).isPresent());
+ // check job status
+ assertThat(miniCluster.getJobStatus(jid).get(), is(JobStatus.FAILED));
+ } else {
assertThat(jobSubmissionResult.getJobID(), is(jid));
final CompletableFuture<JobResult> resultFuture = miniCluster.requestJobResult(jid);
@@ -220,7 +222,6 @@ public class BlobsCleanupITCase extends TestLogger {
.orElse(null);
assertThat(ExceptionUtils.stringifyException(cause), jobResult.isSuccess(), is(true));
}
-
}
// both BlobServer and BlobCache should eventually delete all files
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 55cfde4..b9a480c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
@@ -32,21 +33,20 @@ public class TestingJobManagerRunner implements JobManagerRunner {
private final boolean blockingTermination;
- private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+ private final CompletableFuture<Void> terminationFuture;
private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
- private final CompletableFuture<Void> terminationFuture;
-
- public TestingJobManagerRunner(JobID jobId) {
- this(jobId, false);
- }
+ private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
- public TestingJobManagerRunner(JobID jobId, boolean blockingTermination) {
+ private TestingJobManagerRunner(JobID jobId,
+ boolean blockingTermination,
+ CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
+ CompletableFuture<ArchivedExecutionGraph> resultFuture) {
this.jobId = jobId;
this.blockingTermination = blockingTermination;
- this.resultFuture = new CompletableFuture<>();
- this.jobMasterGatewayFuture = new CompletableFuture<>();
+ this.jobMasterGatewayFuture = jobMasterGatewayFuture;
+ this.resultFuture = resultFuture;
this.terminationFuture = new CompletableFuture<>();
terminationFuture.whenComplete((ignored, ignoredThrowable) -> resultFuture.completeExceptionally(new JobNotFinishedException(jobId)));
@@ -94,4 +94,47 @@ public class TestingJobManagerRunner implements JobManagerRunner {
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
+
+ public void completeJobMasterGatewayFuture(JobMasterGateway testingJobMasterGateway) {
+ this.jobMasterGatewayFuture.complete(testingJobMasterGateway);
+ }
+
+ public static class Builder {
+ private JobID jobId = null;
+ private boolean blockingTermination = false;
+ private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>();
+ private CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+
+ public Builder setJobId(JobID jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public Builder setBlockingTermination(boolean blockingTermination) {
+ this.blockingTermination = blockingTermination;
+ return this;
+ }
+
+ public Builder setJobMasterGatewayFuture(CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) {
+ Preconditions.checkNotNull(jobMasterGatewayFuture);
+ this.jobMasterGatewayFuture = jobMasterGatewayFuture;
+ return this;
+ }
+
+ public Builder setResultFuture(CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+ Preconditions.checkNotNull(resultFuture);
+ this.resultFuture = resultFuture;
+ return this;
+ }
+
+ public TestingJobManagerRunner build() {
+ Preconditions.checkNotNull(jobId);
+ return new TestingJobManagerRunner(
+ jobId,
+ blockingTermination,
+ jobMasterGatewayFuture,
+ resultFuture);
+ }
+ }
+
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 22f86d7..2edcc23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -137,6 +137,9 @@ public class LeaderChangeClusterComponentsTest extends TestLogger {
CompletableFuture<JobResult> jobResultFuture = miniCluster.requestJobResult(jobId);
+ // need to wait until init is finished, so that the leadership revocation is possible
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() -> miniCluster.getJobStatus(jobId).get());
+
highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
JobResultUtils.assertIncomplete(jobResultFuture);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 58e6996..f7c882b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.execution.Environment;
@@ -150,7 +151,7 @@ public class TaskExecutorITCase extends TestLogger {
return () -> {
final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join();
- return allExecutionsRunning.test(executionGraph);
+ return allExecutionsRunning.test(executionGraph) && executionGraph.getState() == JobStatus.RUNNING;
};
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 0c11966..ee4c20b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.testutils;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -31,6 +32,8 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeoutException;
/**
@@ -130,6 +133,17 @@ public class CommonTestUtils {
}
}
+ public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier) throws
+ Exception {
+ waitUntilJobManagerIsInitialized(jobStatusSupplier, Deadline.fromNow(Duration.of(1,
+ ChronoUnit.MINUTES)));
+ }
+
+ public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier, Deadline timeout) throws
+ Exception {
+ waitUntilCondition(() -> jobStatusSupplier.get() != JobStatus.INITIALIZING, timeout, 20L);
+ }
+
/**
* Utility class to read the output of a process stream and forward it into a StringWriter.
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 0acc5ac..315a733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -53,7 +53,7 @@ public class TestingRestfulGateway implements RestfulGateway {
static final Function<JobID, CompletableFuture<Acknowledge>> DEFAULT_CANCEL_JOB_FUNCTION = jobId -> CompletableFuture.completedFuture(Acknowledge.get());
static final Function<JobID, CompletableFuture<JobResult>> DEFAULT_REQUEST_JOB_RESULT_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
static final Function<JobID, CompletableFuture<ArchivedExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
- static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
+ static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
static final Supplier<CompletableFuture<MultipleJobsDetails>> DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList()));
static final Supplier<CompletableFuture<ClusterOverview>> DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
static final Supplier<CompletableFuture<Collection<String>>> DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2a3a98c..9746f13 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1798,7 +1798,7 @@ public class StreamExecutionEnvironment {
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
- .execute(streamGraph, configuration);
+ .execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index bee2bdf..153a457 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -86,7 +86,7 @@ public class ExecutorDiscoveryAndJobClientTest {
@Override
public PipelineExecutor getExecutor(Configuration configuration) {
- return (pipeline, executionConfig) -> CompletableFuture.completedFuture(new TestingJobClient());
+ return (pipeline, executionConfig, classLoader) -> CompletableFuture.completedFuture(new TestingJobClient());
}
}
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index e76044d..6b4f12a 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -485,7 +485,7 @@ public class LocalExecutor implements Executor {
configuration.set(DeploymentOptions.ATTACHED, false);
// create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
+ final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, context.getClassLoader());
// wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass
// requires to access UDF in deployer.deploy().
@@ -542,7 +542,7 @@ public class LocalExecutor implements Executor {
// create execution
final ProgramDeployer deployer = new ProgramDeployer(
- configuration, jobName, pipeline);
+ configuration, jobName, pipeline, context.getClassLoader());
JobClient jobClient;
// wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 31b5143..e2b7d46 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -41,6 +41,7 @@ public class ProgramDeployer {
private final Configuration configuration;
private final Pipeline pipeline;
private final String jobName;
+ private final ClassLoader userCodeClassloader;
/**
* Deploys a table program on the cluster.
@@ -52,10 +53,12 @@ public class ProgramDeployer {
public ProgramDeployer(
Configuration configuration,
String jobName,
- Pipeline pipeline) {
+ Pipeline pipeline,
+ ClassLoader userCodeClassloader) {
this.configuration = configuration;
this.pipeline = pipeline;
this.jobName = jobName;
+ this.userCodeClassloader = userCodeClassloader;
}
public CompletableFuture<JobClient> deploy() {
@@ -79,7 +82,7 @@ public class ProgramDeployer {
final PipelineExecutor executor = executorFactory.getExecutor(configuration);
CompletableFuture<JobClient> jobClient;
try {
- jobClient = executor.execute(pipeline, configuration);
+ jobClient = executor.execute(pipeline, configuration, userCodeClassloader);
} catch (Exception e) {
throw new RuntimeException("Could not execute program.", e);
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 4a4f3ef..5d7e755 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -343,7 +343,8 @@ abstract class BatchTableEnvImpl(
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET))
- val jobClientFuture = executorFactory.getExecutor(configuration).execute(plan, configuration)
+ val jobClientFuture = executorFactory.getExecutor(configuration)
+ .execute(plan, configuration, execEnv.getUserCodeClassLoader)
try {
jobClientFuture.get
} catch {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index d70a409..81b979b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
@@ -337,6 +338,8 @@ public class SavepointITCase extends TestLogger {
try {
client.submitJob(graph).get();
+ // triggerSavepoint is only available after job is initialized
+ TestUtils.waitUntilJobInitializationFinished(graph.getJobID(), cluster, ClassLoader.getSystemClassLoader());
client.triggerSavepoint(graph.getJobID(), null).get();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
index f13a243..0dc34a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -81,7 +81,7 @@ public class LocalExecutorITCase extends TestLogger {
Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
wcPlan.setExecutionConfig(new ExecutionConfig());
- JobClient jobClient = executor.execute(wcPlan, config).get();
+ JobClient jobClient = executor.execute(wcPlan, config, ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
} catch (Exception e) {
e.printStackTrace();
@@ -99,7 +99,7 @@ public class LocalExecutorITCase extends TestLogger {
Configuration config = new Configuration();
config.setBoolean(DeploymentOptions.ATTACHED, true);
- JobClient jobClient = executor.execute(runtimeExceptionPlan, config).get();
+ JobClient jobClient = executor.execute(runtimeExceptionPlan, config, ClassLoader.getSystemClassLoader()).get();
assertThrows(
"Job execution failed.",
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 5723978..d1a6c4e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -34,14 +34,10 @@ import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import javax.annotation.Nonnull;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.function.Predicate;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
@@ -50,7 +46,6 @@ import static org.junit.Assert.fail;
/**
* Tests for failing job submissions.
*/
-@RunWith(Parameterized.class)
public class JobSubmissionFailsITCase extends TestLogger {
private static final int NUM_TM = 2;
@@ -80,19 +75,6 @@ public class JobSubmissionFailsITCase extends TestLogger {
return new JobGraph("Working testing job", jobVertex);
}
- // --------------------------------------------------------------------------------------------
-
- private final boolean detached;
-
- public JobSubmissionFailsITCase(boolean detached) {
- this.detached = detached;
- }
-
- @Parameterized.Parameters(name = "Detached mode = {0}")
- public static Collection<Boolean[]> executionModes(){
- return Arrays.asList(new Boolean[]{false},
- new Boolean[]{true});
- }
// --------------------------------------------------------------------------------------------
@@ -131,11 +113,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
try {
- if (detached) {
- client.submitJob(jobGraph).get();
- } else {
- submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
- }
+ submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
fail("Job submission should have thrown an exception.");
} catch (Exception e) {
if (!failurePredicate.test(e)) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index e548f5e..7452332 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -152,7 +152,7 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
@Override
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
- return (pipeline, config) -> {
+ return (pipeline, config, classLoader) -> {
assertTrue(pipeline instanceof StreamGraph);
actualSavepointRestoreSettings =
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index 4f3b6f0..6447183 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -118,6 +119,10 @@ public class BackPressureITCase extends TestLogger {
final JobVertex mapJobVertex = vertices.get(1);
testingMiniCluster.submitJob(jobGraph).get();
+ ClientUtils.waitUntilJobInitializationFinished(
+ () -> testingMiniCluster.getJobStatus(TEST_JOB_ID).get(),
+ () -> testingMiniCluster.requestJobResult(TEST_JOB_ID).get(),
+ ClassLoader.getSystemClassLoader());
assertJobVertexSubtasksAreBackPressured(mapJobVertex);
assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2647226..229c655 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -840,7 +841,7 @@ public class TimestampITCase extends TestLogger {
private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Collection<JobStatusMessage> statusMessages = client.listJobs().get();
return statusMessages.stream()
- .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .filter(status -> !status.getJobState().isGloballyTerminalState() && status.getJobState() != JobStatus.INITIALIZING)
.map(JobStatusMessage::getJobId)
.collect(Collectors.toList());
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 95550e0..7a2bd0b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,9 +19,12 @@
package org.apache.flink.test.util;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -61,4 +64,10 @@ public class TestUtils {
.get()
.toJobExecutionResult(classLoader);
}
+
+ public static void waitUntilJobInitializationFinished(JobID id, MiniClusterWithClientResource miniCluster, ClassLoader userCodeClassloader) throws
+ JobInitializationException {
+ ClusterClient<?> clusterClient = miniCluster.getClusterClient();
+ ClientUtils.waitUntilJobInitializationFinished(() -> clusterClient.getJobStatus(id).get(), () -> clusterClient.requestJobResult(id).get(), userCodeClassloader);
+ }
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 6935ce3..1642a39 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
@@ -310,7 +311,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
CommonTestUtils.waitUntilCondition(
() -> {
final JobDetailsInfo jobDetails = restClusterClient.getJobDetails(jobId).get();
- return jobDetails.getJobVertexInfos()
+ return jobDetails.getJobStatus() == JobStatus.RUNNING && jobDetails.getJobVertexInfos()
.stream()
.map(toExecutionState())
.allMatch(isRunning());