You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/23 16:29:21 UTC
[flink] 03/03: [FLINK-21188][runtime] Introduces ExecutionGraphInfo
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7e93fd671a0dc0e7ba919a447ca9e51bb451fdd
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Jan 27 15:47:33 2021 +0100
[FLINK-21188][runtime] Introduces ExecutionGraphInfo
This closes #14804.
---
.../application/ApplicationClusterEntryPoint.java | 8 +-
.../flink/runtime/dispatcher/Dispatcher.java | 58 ++---
.../flink/runtime/dispatcher/DispatcherJob.java | 36 +--
.../runtime/dispatcher/DispatcherJobResult.java | 25 ++-
.../runtime/dispatcher/DispatcherServices.java | 10 +-
...raphStore.java => ExecutionGraphInfoStore.java} | 20 +-
...Store.java => FileExecutionGraphInfoStore.java} | 77 +++----
.../runtime/dispatcher/HistoryServerArchivist.java | 10 +-
.../JsonResponseHistoryServerArchivist.java | 8 +-
...ore.java => MemoryExecutionGraphInfoStore.java} | 38 ++--
.../flink/runtime/dispatcher/MiniDispatcher.java | 7 +-
.../dispatcher/PartialDispatcherServices.java | 10 +-
...PartialDispatcherServicesWithJobGraphStore.java | 4 +-
.../dispatcher/VoidHistoryServerArchivist.java | 5 +-
.../runtime/entrypoint/ClusterEntrypoint.java | 14 +-
.../runtime/entrypoint/JobClusterEntrypoint.java | 8 +-
.../entrypoint/SessionClusterEntrypoint.java | 8 +-
...tDispatcherResourceManagerComponentFactory.java | 6 +-
.../DispatcherResourceManagerComponentFactory.java | 4 +-
.../runtime/jobmanager/OnCompletionActions.java | 6 +-
.../runtime/jobmaster/JobManagerRunnerImpl.java | 8 +-
.../runtime/jobmaster/JobManagerRunnerResult.java | 28 +--
.../apache/flink/runtime/jobmaster/JobMaster.java | 10 +-
.../flink/runtime/jobmaster/JobMasterGateway.java | 8 +-
.../flink/runtime/minicluster/MiniCluster.java | 4 +-
...va => AbstractAccessExecutionGraphHandler.java} | 63 +++---
.../handler/job/AbstractExecutionGraphHandler.java | 17 +-
.../rest/handler/job/AbstractJobVertexHandler.java | 2 +-
.../rest/handler/job/JobAccumulatorsHandler.java | 3 +-
.../runtime/rest/handler/job/JobConfigHandler.java | 2 +-
.../rest/handler/job/JobDetailsHandler.java | 2 +-
.../rest/handler/job/JobExceptionsHandler.java | 3 +-
.../runtime/rest/handler/job/JobPlanHandler.java | 3 +-
.../rest/handler/job/JobVertexDetailsHandler.java | 3 +-
.../handler/job/JobVertexTaskManagersHandler.java | 3 +-
.../job/checkpoints/AbstractCheckpointHandler.java | 4 +-
.../job/checkpoints/CheckpointConfigHandler.java | 4 +-
.../CheckpointingStatisticsHandler.java | 4 +-
.../handler/legacy/DefaultExecutionGraphCache.java | 40 ++--
.../rest/handler/legacy/ExecutionGraphCache.java | 21 +-
.../runtime/scheduler/ExecutionGraphInfo.java | 62 ++++++
.../flink/runtime/scheduler/SchedulerBase.java | 5 +-
.../flink/runtime/scheduler/SchedulerNG.java | 3 +-
.../scheduler/adaptive/AdaptiveScheduler.java | 6 +-
.../flink/runtime/webmonitor/RestfulGateway.java | 20 +-
.../runtime/webmonitor/history/JsonArchivist.java | 20 ++
.../runtime/dispatcher/DispatcherJobTest.java | 67 ++++--
.../dispatcher/DispatcherResourceCleanupTest.java | 36 +--
.../flink/runtime/dispatcher/DispatcherTest.java | 42 ++--
...t.java => FileExecutionGraphInfoStoreTest.java} | 244 ++++++++++++---------
.../runtime/dispatcher/MiniDispatcherTest.java | 34 +--
.../runtime/dispatcher/TestingDispatcher.java | 6 +-
.../runner/DefaultDispatcherRunnerITCase.java | 4 +-
.../ZooKeeperDefaultDispatcherRunnerTest.java | 4 +-
.../runtime/entrypoint/ClusterEntrypointTest.java | 4 +-
.../jobmaster/JobManagerRunnerImplTest.java | 19 +-
.../jobmaster/JobManagerRunnerResultTest.java | 18 +-
...asterExecutionDeploymentReconciliationTest.java | 6 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 12 +-
.../runtime/jobmaster/TestingJobManagerRunner.java | 7 +-
.../runtime/jobmaster/utils/JobMasterBuilder.java | 15 +-
.../jobmaster/utils/TestingJobMasterGateway.java | 8 +-
.../utils/TestingJobMasterGatewayBuilder.java | 6 +-
.../runtime/minicluster/TestingMiniCluster.java | 4 +-
.../legacy/DefaultExecutionGraphCacheTest.java | 104 ++++-----
.../runtime/rest/util/NoOpExecutionGraphCache.java | 4 +-
.../runtime/scheduler/DefaultSchedulerTest.java | 118 ++++++++--
.../runtime/scheduler/SchedulerTestingUtils.java | 7 +-
.../runtime/scheduler/TestingSchedulerNG.java | 3 +-
.../webmonitor/TestingDispatcherGateway.java | 5 +
.../webmonitor/TestingExecutionGraphCache.java | 12 +-
.../runtime/webmonitor/TestingRestfulGateway.java | 29 +++
.../recovery/ProcessFailureCancelingITCase.java | 4 +-
73 files changed, 921 insertions(+), 611 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
index 30e872d..e0b2128 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -78,9 +78,9 @@ public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
}
@Override
- protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
final Configuration configuration, final ScheduledExecutor scheduledExecutor) {
- return new MemoryArchivedExecutionGraphStore();
+ return new MemoryExecutionGraphInfoStore();
}
protected static void configureExecution(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1a4befe..63ea5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -121,7 +122,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
- private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ private final ExecutionGraphInfoStore executionGraphInfoStore;
private final JobManagerRunnerFactory jobManagerRunnerFactory;
@@ -177,7 +178,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
- this.archivedExecutionGraphStore = dispatcherServices.getArchivedExecutionGraphStore();
+ this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
@@ -441,7 +442,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
&& executionType == ExecutionType.RECOVERY) {
return dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
} else {
- return jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
+ return jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo());
}
}
@@ -557,8 +558,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
CompletableFuture<Collection<JobStatus>> allJobsFuture =
allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
- final JobsOverview completedJobsOverview =
- archivedExecutionGraphStore.getStoredJobsOverview();
+ final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
@@ -581,7 +581,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
final Collection<JobDetails> completedJobDetails =
- archivedExecutionGraphStore.getAvailableJobDetails();
+ executionGraphInfoStore.getAvailableJobDetails();
return combinedJobDetails.thenApply(
(Collection<JobDetails> runningJobDetails) -> {
@@ -603,7 +603,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
() -> {
// is it a completed job?
final JobDetails jobDetails =
- archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+ executionGraphInfoStore.getAvailableJobDetails(jobId);
if (jobDetails == null) {
return FutureUtils.completedExceptionally(
new FlinkJobNotFoundException(jobId));
@@ -614,17 +614,18 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
@Override
- public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
- Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException =
+ public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+ JobID jobId, Time timeout) {
+ Function<Throwable, ExecutionGraphInfo> checkExecutionGraphStoreOnException =
throwable -> {
// check whether it is a completed job
- final ArchivedExecutionGraph archivedExecutionGraph =
- archivedExecutionGraphStore.get(jobId);
- if (archivedExecutionGraph == null) {
+ final ExecutionGraphInfo executionGraphInfo =
+ executionGraphInfoStore.get(jobId);
+ if (executionGraphInfo == null) {
throw new CompletionException(
ExceptionUtils.stripCompletionException(throwable));
} else {
- return archivedExecutionGraph;
+ return executionGraphInfo;
}
};
Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
@@ -638,21 +639,22 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
DispatcherJob job = runningJobs.get(jobId);
if (job == null) {
- final ArchivedExecutionGraph archivedExecutionGraph =
- archivedExecutionGraphStore.get(jobId);
+ final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
- if (archivedExecutionGraph == null) {
+ if (executionGraphInfo == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
return CompletableFuture.completedFuture(
- JobResult.createFrom(archivedExecutionGraph));
+ JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
}
} else {
return job.getResultFuture()
.thenApply(
dispatcherJobResult ->
JobResult.createFrom(
- dispatcherJobResult.getArchivedExecutionGraph()));
+ dispatcherJobResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()));
}
}
@@ -827,7 +829,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
protected CleanupJobState jobReachedGloballyTerminalState(
- ArchivedExecutionGraph archivedExecutionGraph) {
+ ExecutionGraphInfo executionGraphInfo) {
+ ArchivedExecutionGraph archivedExecutionGraph =
+ executionGraphInfo.getArchivedExecutionGraph();
Preconditions.checkArgument(
archivedExecutionGraph.getState().isGloballyTerminalState(),
"Job %s is in state %s which is not globally terminal.",
@@ -839,32 +843,32 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
archivedExecutionGraph.getJobID(),
archivedExecutionGraph.getState());
- archiveExecutionGraph(archivedExecutionGraph);
+ archiveExecutionGraph(executionGraphInfo);
return CleanupJobState.GLOBAL;
}
- private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
+ private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
try {
- archivedExecutionGraphStore.put(archivedExecutionGraph);
+ executionGraphInfoStore.put(executionGraphInfo);
} catch (IOException e) {
log.info(
"Could not store completed job {}({}).",
- archivedExecutionGraph.getJobName(),
- archivedExecutionGraph.getJobID(),
+ executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+ executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
final CompletableFuture<Acknowledge> executionGraphFuture =
- historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
+ historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
executionGraphFuture.whenComplete(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
log.info(
"Could not archive completed job {}({}) to the history server.",
- archivedExecutionGraph.getJobName(),
- archivedExecutionGraph.getJobID(),
+ executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+ executionGraphInfo.getArchivedExecutionGraph().getJobID(),
throwable);
}
});
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
index 9e253f2..7243303 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -137,8 +138,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
private void handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult) {
if (jobManagerRunnerResult.isSuccess()) {
jobResultFuture.complete(
- DispatcherJobResult.forSuccess(
- jobManagerRunnerResult.getArchivedExecutionGraph()));
+ DispatcherJobResult.forSuccess(jobManagerRunnerResult.getExecutionGraphInfo()));
} else if (jobManagerRunnerResult.isJobNotFinished()) {
jobResultFuture.completeExceptionally(new JobNotFinishedException(jobId));
} else if (jobManagerRunnerResult.isInitializationFailure()) {
@@ -156,7 +156,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
initializationTimestamp);
jobResultFuture.complete(
DispatcherJobResult.forInitializationFailure(
- archivedExecutionGraph, initializationFailure));
+ new ExecutionGraphInfo(archivedExecutionGraph), initializationFailure));
}
public CompletableFuture<DispatcherJobResult> getResultFuture() {
@@ -166,9 +166,10 @@ public final class DispatcherJob implements AutoCloseableAsync {
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return requestJob(timeout)
.thenApply(
- executionGraph -> {
+ executionGraphInfo -> {
synchronized (lock) {
- return JobDetails.createDetailsForJob(executionGraph);
+ return JobDetails.createDetailsForJob(
+ executionGraphInfo.getArchivedExecutionGraph());
}
});
}
@@ -205,16 +206,18 @@ public final class DispatcherJob implements AutoCloseableAsync {
}
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
- return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+ return requestJob(timeout)
+ .thenApply(
+ executionGraphInfo ->
+ executionGraphInfo.getArchivedExecutionGraph().getState());
}
- /** Returns a future completing to the ArchivedExecutionGraph of the job. */
- public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+ /** Returns a future completing to the ExecutionGraphInfo of the job. */
+ public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
synchronized (lock) {
if (isInitialized()) {
if (jobResultFuture.isDone()) { // job is not running anymore
- return jobResultFuture.thenApply(
- DispatcherJobResult::getArchivedExecutionGraph);
+ return jobResultFuture.thenApply(DispatcherJobResult::getExecutionGraphInfo);
}
// job is still running
return getJobMasterGateway()
@@ -224,12 +227,13 @@ public final class DispatcherJob implements AutoCloseableAsync {
this.jobStatus == DispatcherJobStatus.INITIALIZING
|| jobStatus == DispatcherJobStatus.CANCELLING);
return CompletableFuture.completedFuture(
- ArchivedExecutionGraph.createFromInitializingJob(
- jobId,
- jobName,
- jobStatus.asJobStatus(),
- null,
- initializationTimestamp));
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ jobStatus.asJobStatus(),
+ null,
+ initializationTimestamp)));
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
index ad4f5ed..a5d93da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
@@ -19,25 +19,26 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
/**
- * Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization
- * has failed. For initialization failures, the throwable is also attached, to avoid deserializing
- * it from the ArchivedExecutionGraph.
+ * Container for returning the {@link ExecutionGraphInfo} and a flag whether the initialization has
+ * failed. For initialization failures, the throwable is also attached, to avoid deserializing it
+ * from the {@link ArchivedExecutionGraph}.
*/
final class DispatcherJobResult {
- private final ArchivedExecutionGraph archivedExecutionGraph;
+ private final ExecutionGraphInfo executionGraphInfo;
// if the throwable field is set, the job failed during initialization.
@Nullable private final Throwable initializationFailure;
private DispatcherJobResult(
- ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
- this.archivedExecutionGraph = archivedExecutionGraph;
+ ExecutionGraphInfo executionGraphInfo, @Nullable Throwable throwable) {
+ this.executionGraphInfo = executionGraphInfo;
this.initializationFailure = throwable;
}
@@ -45,8 +46,8 @@ final class DispatcherJobResult {
return initializationFailure != null;
}
- public ArchivedExecutionGraph getArchivedExecutionGraph() {
- return archivedExecutionGraph;
+ public ExecutionGraphInfo getExecutionGraphInfo() {
+ return executionGraphInfo;
}
/** @throws IllegalStateException if this DispatcherJobResult is a successful initialization. */
@@ -58,11 +59,11 @@ final class DispatcherJobResult {
}
public static DispatcherJobResult forInitializationFailure(
- ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
- return new DispatcherJobResult(archivedExecutionGraph, throwable);
+ ExecutionGraphInfo executionGraphInfo, Throwable throwable) {
+ return new DispatcherJobResult(executionGraphInfo, throwable);
}
- public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
- return new DispatcherJobResult(archivedExecutionGraph, null);
+ public static DispatcherJobResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+ return new DispatcherJobResult(executionGraphInfo, null);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 79a837a..d4ecaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -48,7 +48,7 @@ public class DispatcherServices {
@Nonnull private final JobManagerMetricGroup jobManagerMetricGroup;
- @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
@Nonnull private final FatalErrorHandler fatalErrorHandler;
@@ -68,7 +68,7 @@ public class DispatcherServices {
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull BlobServer blobServer,
@Nonnull HeartbeatServices heartbeatServices,
- @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
@Nonnull FatalErrorHandler fatalErrorHandler,
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -81,7 +81,7 @@ public class DispatcherServices {
this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
this.blobServer = blobServer;
this.heartbeatServices = heartbeatServices;
- this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+ this.executionGraphInfoStore = executionGraphInfoStore;
this.fatalErrorHandler = fatalErrorHandler;
this.historyServerArchivist = historyServerArchivist;
this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -122,8 +122,8 @@ public class DispatcherServices {
}
@Nonnull
- public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
- return archivedExecutionGraphStore;
+ public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+ return executionGraphInfoStore;
}
@Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
similarity index 76%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
index d38d804..1d63567 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import javax.annotation.Nullable;
@@ -29,32 +29,32 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-/** Interface for a {@link ArchivedExecutionGraph} store. */
-public interface ArchivedExecutionGraphStore extends Closeable {
+/** Interface for a {@link ExecutionGraphInfo} store. */
+public interface ExecutionGraphInfoStore extends Closeable {
/**
- * Returns the current number of stored {@link ArchivedExecutionGraph}.
+ * Returns the current number of stored {@link ExecutionGraphInfo} instances.
*
- * @return Current number of stored {@link ArchivedExecutionGraph}
+ * @return Current number of stored {@link ExecutionGraphInfo} instances
*/
int size();
/**
- * Get the {@link ArchivedExecutionGraph} for the given job id. Null if it isn't stored.
+ * Get the {@link ExecutionGraphInfo} for the given job id. Null if it isn't stored.
*
* @param jobId identifying the serializable execution graph to retrieve
* @return The stored serializable execution graph or null
*/
@Nullable
- ArchivedExecutionGraph get(JobID jobId);
+ ExecutionGraphInfo get(JobID jobId);
/**
- * Store the given {@link ArchivedExecutionGraph} in the store.
+ * Store the given {@link ExecutionGraphInfo} in the store.
*
- * @param archivedExecutionGraph to store
+ * @param executionGraphInfo to store
* @throws IOException if the serializable execution graph could not be stored in the store
*/
- void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException;
+ void put(ExecutionGraphInfo executionGraphInfo) throws IOException;
/**
* Return the {@link JobsOverview} for all stored/past jobs.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
similarity index 78%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
index f08c996..b08d426 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
@@ -55,20 +56,19 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
- * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
- * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
- * the stored execution graphs are periodically cleaned up.
+ * Store for {@link ExecutionGraphInfo} instances. The store writes the archived execution graph
+ * information to disk and keeps the most recently used execution graphs in a memory cache for
+ * faster serving. Moreover, the stored execution graphs are periodically cleaned up.
*/
-public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private static final Logger LOG =
- LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
private final File storageDir;
private final Cache<JobID, JobDetails> jobDetailsCache;
- private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
+ private final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache;
private final ScheduledFuture<?> cleanupFuture;
@@ -80,7 +80,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
private int numCanceledJobs;
- public FileArchivedExecutionGraphStore(
+ public FileExecutionGraphInfoStore(
File rootDir,
Time expirationTime,
int maximumCapacity,
@@ -93,7 +93,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
LOG.info(
"Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
- FileArchivedExecutionGraphStore.class.getSimpleName(),
+ FileExecutionGraphInfoStore.class.getSimpleName(),
storageDirectory,
expirationTime.toMilliseconds(),
maximumCacheSizeBytes);
@@ -113,15 +113,14 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
.ticker(ticker)
.build();
- this.archivedExecutionGraphCache =
+ this.executionGraphInfoCache =
CacheBuilder.newBuilder()
.maximumWeight(maximumCacheSizeBytes)
.weigher(this::calculateSize)
.build(
- new CacheLoader<JobID, ArchivedExecutionGraph>() {
+ new CacheLoader<JobID, ExecutionGraphInfo>() {
@Override
- public ArchivedExecutionGraph load(JobID jobId)
- throws Exception {
+ public ExecutionGraphInfo load(JobID jobId) throws Exception {
return loadExecutionGraph(jobId);
}
});
@@ -147,19 +146,23 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
@Override
@Nullable
- public ArchivedExecutionGraph get(JobID jobId) {
+ public ExecutionGraphInfo get(JobID jobId) {
try {
- return archivedExecutionGraphCache.get(jobId);
+ return executionGraphInfoCache.get(jobId);
} catch (ExecutionException e) {
- LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
+ LOG.debug(
+ "Could not load archived execution graph information for job id {}.", jobId, e);
return null;
}
}
@Override
- public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
+ public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
+ final JobID jobId = executionGraphInfo.getJobId();
+
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ executionGraphInfo.getArchivedExecutionGraph();
final JobStatus jobStatus = archivedExecutionGraph.getState();
- final JobID jobId = archivedExecutionGraph.getJobID();
final String jobName = archivedExecutionGraph.getJobName();
Preconditions.checkArgument(
@@ -195,12 +198,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
}
// write the ArchivedExecutionGraph to disk
- storeArchivedExecutionGraph(archivedExecutionGraph);
+ storeExecutionGraphInfo(executionGraphInfo);
final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);
jobDetailsCache.put(jobId, detailsForJob);
- archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
+ executionGraphInfoCache.put(jobId, executionGraphInfo);
}
@Override
@@ -236,27 +239,28 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
// Internal methods
// --------------------------------------------------------------
- private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
- final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+ private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) {
+ final File executionGraphInfoFile = getExecutionGraphFile(jobId);
- if (archivedExecutionGraphFile.exists()) {
- return Math.toIntExact(archivedExecutionGraphFile.length());
+ if (executionGraphInfoFile.exists()) {
+ return Math.toIntExact(executionGraphInfoFile.length());
} else {
LOG.debug(
- "Could not find archived execution graph file for {}. Estimating the size instead.",
+ "Could not find execution graph information file for {}. Estimating the size instead.",
jobId);
+ final ArchivedExecutionGraph serializableExecutionGraph =
+ serializableExecutionGraphInfo.getArchivedExecutionGraph();
return serializableExecutionGraph.getAllVertices().size() * 1000
+ serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
}
}
- private ArchivedExecutionGraph loadExecutionGraph(JobID jobId)
+ private ExecutionGraphInfo loadExecutionGraph(JobID jobId)
throws IOException, ClassNotFoundException {
- final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+ final File executionGraphInfoFile = getExecutionGraphFile(jobId);
- if (archivedExecutionGraphFile.exists()) {
- try (FileInputStream fileInputStream =
- new FileInputStream(archivedExecutionGraphFile)) {
+ if (executionGraphInfoFile.exists()) {
+ try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile)) {
return InstantiationUtil.deserializeObject(
fileInputStream, getClass().getClassLoader());
}
@@ -268,13 +272,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
}
}
- private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph)
- throws IOException {
+ private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
final File archivedExecutionGraphFile =
- getExecutionGraphFile(archivedExecutionGraph.getJobID());
+ getExecutionGraphFile(executionGraphInfo.getJobId());
try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
- InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
+ InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
}
}
@@ -293,7 +296,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
}
- archivedExecutionGraphCache.invalidate(jobId);
+ executionGraphInfoCache.invalidate(jobId);
jobDetailsCache.invalidate(jobId);
}
@@ -323,7 +326,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
}
@VisibleForTesting
- LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
- return archivedExecutionGraphCache;
+ LoadingCache<JobID, ExecutionGraphInfo> getExecutionGraphInfoCache() {
+ return executionGraphInfoCache;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
index 2a571d2..f40af97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -21,23 +21,23 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-/** Writer for an {@link AccessExecutionGraph}. */
+/** Writer for an {@link ExecutionGraphInfo}. */
public interface HistoryServerArchivist {
/**
- * Archives the given {@link AccessExecutionGraph} on the history server.
+ * Archives the given {@link ExecutionGraphInfo} on the history server.
*
- * @param executionGraph to store on the history server
+ * @param executionGraphInfo to store on the history server
* @return Future which is completed once the archiving has been completed.
*/
- CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+ CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo);
static HistoryServerArchivist createHistoryServerArchivist(
Configuration configuration, JsonArchivist jsonArchivist, Executor ioExecutor) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
index 991ff67..e68f3b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -50,14 +51,15 @@ class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
@Override
public CompletableFuture<Acknowledge> archiveExecutionGraph(
- AccessExecutionGraph executionGraph) {
+ ExecutionGraphInfo executionGraphInfo) {
return CompletableFuture.runAsync(
ThrowingRunnable.unchecked(
() ->
FsJobArchivist.archiveJob(
archivePath,
- executionGraph.getJobID(),
- jsonArchivist.archiveJsonWithPath(executionGraph))),
+ executionGraphInfo.getJobId(),
+ jsonArchivist.archiveJsonWithPath(
+ executionGraphInfo))),
ioExecutor)
.thenApply(ignored -> Acknowledge.get());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
similarity index 60%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
index 53df4a7..35b57da3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import javax.annotation.Nullable;
@@ -33,34 +34,35 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
- * {@link ArchivedExecutionGraphStore} implementation which stores the {@link
- * ArchivedExecutionGraph} in memory.
+ * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in
+ * memory.
*/
-public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ArchivedExecutionGraph> serializableExecutionGraphs = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = new HashMap<>(4);
@Override
public int size() {
- return serializableExecutionGraphs.size();
+ return serializableExecutionGraphInfos.size();
}
@Nullable
@Override
- public ArchivedExecutionGraph get(JobID jobId) {
- return serializableExecutionGraphs.get(jobId);
+ public ExecutionGraphInfo get(JobID jobId) {
+ return serializableExecutionGraphInfos.get(jobId);
}
@Override
- public void put(ArchivedExecutionGraph serializableExecutionGraph) throws IOException {
- serializableExecutionGraphs.put(
- serializableExecutionGraph.getJobID(), serializableExecutionGraph);
+ public void put(ExecutionGraphInfo serializableExecutionGraphInfo) throws IOException {
+ serializableExecutionGraphInfos.put(
+ serializableExecutionGraphInfo.getJobId(), serializableExecutionGraphInfo);
}
@Override
public JobsOverview getStoredJobsOverview() {
Collection<JobStatus> allJobStatus =
- serializableExecutionGraphs.values().stream()
+ serializableExecutionGraphInfos.values().stream()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(ArchivedExecutionGraph::getState)
.collect(Collectors.toList());
@@ -69,7 +71,8 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
@Override
public Collection<JobDetails> getAvailableJobDetails() {
- return serializableExecutionGraphs.values().stream()
+ return serializableExecutionGraphInfos.values().stream()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
@@ -77,11 +80,12 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
- final ArchivedExecutionGraph archivedExecutionGraph =
- serializableExecutionGraphs.get(jobId);
+ final ExecutionGraphInfo archivedExecutionGraphInfo =
+ serializableExecutionGraphInfos.get(jobId);
- if (archivedExecutionGraph != null) {
- return JobDetails.createDetailsForJob(archivedExecutionGraph);
+ if (archivedExecutionGraphInfo != null) {
+ return JobDetails.createDetailsForJob(
+ archivedExecutionGraphInfo.getArchivedExecutionGraph());
} else {
return null;
}
@@ -89,6 +93,6 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
@Override
public void close() throws IOException {
- serializableExecutionGraphs.clear();
+ serializableExecutionGraphInfos.clear();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 553cc76..4b9b4b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FlinkException;
import java.util.Collections;
@@ -119,9 +120,11 @@ public class MiniDispatcher extends Dispatcher {
@Override
protected CleanupJobState jobReachedGloballyTerminalState(
- ArchivedExecutionGraph archivedExecutionGraph) {
+ ExecutionGraphInfo executionGraphInfo) {
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ executionGraphInfo.getArchivedExecutionGraph();
final CleanupJobState cleanupHAState =
- super.jobReachedGloballyTerminalState(archivedExecutionGraph);
+ super.jobReachedGloballyTerminalState(executionGraphInfo);
if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
// shut down if job is cancelled or we don't have to wait for the execution result
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 81bd0d1..6fc8d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -49,7 +49,7 @@ public class PartialDispatcherServices {
@Nonnull private final JobManagerMetricGroupFactory jobManagerMetricGroupFactory;
- @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
@Nonnull private final FatalErrorHandler fatalErrorHandler;
@@ -66,7 +66,7 @@ public class PartialDispatcherServices {
@Nonnull BlobServer blobServer,
@Nonnull HeartbeatServices heartbeatServices,
@Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
- @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
@Nonnull FatalErrorHandler fatalErrorHandler,
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -77,7 +77,7 @@ public class PartialDispatcherServices {
this.blobServer = blobServer;
this.heartbeatServices = heartbeatServices;
this.jobManagerMetricGroupFactory = jobManagerMetricGroupFactory;
- this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+ this.executionGraphInfoStore = executionGraphInfoStore;
this.fatalErrorHandler = fatalErrorHandler;
this.historyServerArchivist = historyServerArchivist;
this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -115,8 +115,8 @@ public class PartialDispatcherServices {
}
@Nonnull
- public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
- return archivedExecutionGraphStore;
+ public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+ return executionGraphInfoStore;
}
@Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
index ac69737..f4a4716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
@@ -44,7 +44,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
@Nonnull BlobServer blobServer,
@Nonnull HeartbeatServices heartbeatServices,
@Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
- @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
@Nonnull FatalErrorHandler fatalErrorHandler,
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -57,7 +57,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
blobServer,
heartbeatServices,
jobManagerMetricGroupFactory,
- archivedExecutionGraphStore,
+ executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricQueryServiceAddress,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
index 0ce3ba6..28b4217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.dispatcher;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import java.util.concurrent.CompletableFuture;
@@ -28,8 +28,7 @@ public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
INSTANCE;
@Override
- public CompletableFuture<Acknowledge> archiveExecutionGraph(
- AccessExecutionGraph executionGraph) {
+ public CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraph) {
return CompletableFuture.completedFuture(Acknowledge.get());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8caa6a7..04111ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -145,7 +145,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private ExecutorService ioExecutor;
- private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ private ExecutionGraphInfoStore executionGraphInfoStore;
private final Thread shutDownHook;
@@ -254,7 +254,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
blobServer,
heartbeatServices,
metricRegistry,
- archivedExecutionGraphStore,
+ executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
@@ -322,7 +322,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
- archivedExecutionGraphStore =
+ executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
}
@@ -399,9 +399,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
}
}
- if (archivedExecutionGraphStore != null) {
+ if (executionGraphInfoStore != null) {
try {
- archivedExecutionGraphStore.close();
+ executionGraphInfoStore.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
@@ -538,7 +538,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
createDispatcherResourceManagerComponentFactory(Configuration configuration)
throws IOException;
- protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+ protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
protected static EntrypointClusterConfiguration parseArguments(String[] args)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2b19f6e..889ea7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
/** Base class for per-job cluster entry points. */
public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
@@ -31,8 +31,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
}
@Override
- protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) {
- return new MemoryArchivedExecutionGraphStore();
+ return new MemoryExecutionGraphInfoStore();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fb46589..b455f02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore;
import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
@@ -39,7 +39,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
@Override
- protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException {
final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
@@ -50,7 +50,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
final long maximumCacheSizeBytes =
configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
- return new FileArchivedExecutionGraphStore(
+ return new FileExecutionGraphInfoStore(
tmpDir,
expirationTime,
maximumCapacity,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index e9b7847..10963ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -106,7 +106,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
- ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler)
throws Exception {
@@ -201,7 +201,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
() ->
MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry, hostname),
- archivedExecutionGraphStore,
+ executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index 7182f3f..8cddd19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.entrypoint.component;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -41,7 +41,7 @@ public interface DispatcherResourceManagerComponentFactory {
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
- ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler)
throws Exception;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 01fb137..f6d820f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.jobmanager;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
/** Interface for completion actions once a Flink job has reached a terminal state. */
public interface OnCompletionActions {
@@ -27,9 +27,9 @@ public interface OnCompletionActions {
/**
* Job reached a globally terminal state.
*
- * @param executionGraph serializable execution graph
+ * @param executionGraphInfo contains information about the terminated job
*/
- void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph);
+ void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo);
/** Job was finished by another JobMaster. */
void jobFinishedByOther();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index 629326c..042945d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -241,10 +241,10 @@ public class JobManagerRunnerImpl
/** Job completion notification triggered by JobManager. */
@Override
- public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+ public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
unregisterJobFromHighAvailability();
- // complete the result future with the terminal execution graph
- resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraph));
+ // complete the result future with the information of the information of the terminated job
+ resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
}
/** Job completion notification triggered by self. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
index ffda268..53542ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -28,37 +28,37 @@ import java.util.Objects;
/** The result of the {@link JobManagerRunner}. */
public final class JobManagerRunnerResult {
- @Nullable private final ArchivedExecutionGraph archivedExecutionGraph;
+ @Nullable private final ExecutionGraphInfo executionGraphInfo;
@Nullable private final Throwable failure;
private JobManagerRunnerResult(
- @Nullable ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable failure) {
- this.archivedExecutionGraph = archivedExecutionGraph;
+ @Nullable ExecutionGraphInfo executionGraphInfo, @Nullable Throwable failure) {
+ this.executionGraphInfo = executionGraphInfo;
this.failure = failure;
}
public boolean isSuccess() {
- return archivedExecutionGraph != null && failure == null;
+ return executionGraphInfo != null && failure == null;
}
public boolean isJobNotFinished() {
- return archivedExecutionGraph == null && failure == null;
+ return executionGraphInfo == null && failure == null;
}
public boolean isInitializationFailure() {
- return archivedExecutionGraph == null && failure != null;
+ return executionGraphInfo == null && failure != null;
}
/**
* This method returns the payload of the successful JobManagerRunnerResult.
*
- * @return the successful completed {@link ArchivedExecutionGraph}
+ * @return the @link ExecutionGraphInfo} of a successfully finished job
* @throws IllegalStateException if the result is not a success
*/
- public ArchivedExecutionGraph getArchivedExecutionGraph() {
+ public ExecutionGraphInfo getExecutionGraphInfo() {
Preconditions.checkState(isSuccess());
- return archivedExecutionGraph;
+ return executionGraphInfo;
}
/**
@@ -81,21 +81,21 @@ public final class JobManagerRunnerResult {
return false;
}
JobManagerRunnerResult that = (JobManagerRunnerResult) o;
- return Objects.equals(archivedExecutionGraph, that.archivedExecutionGraph)
+ return Objects.equals(executionGraphInfo, that.executionGraphInfo)
&& Objects.equals(failure, that.failure);
}
@Override
public int hashCode() {
- return Objects.hash(archivedExecutionGraph, failure);
+ return Objects.hash(executionGraphInfo, failure);
}
public static JobManagerRunnerResult forJobNotFinished() {
return new JobManagerRunnerResult(null, null);
}
- public static JobManagerRunnerResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
- return new JobManagerRunnerResult(archivedExecutionGraph, null);
+ public static JobManagerRunnerResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+ return new JobManagerRunnerResult(executionGraphInfo, null);
}
public static JobManagerRunnerResult forInitializationFailure(Throwable failure) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c353258..483b06a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
@@ -74,6 +73,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -766,7 +766,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
}
@Override
- public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+ public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJob());
}
@@ -975,11 +975,9 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
: partitionTracker
::stopTrackingAndReleasePartitionsFor));
- final ArchivedExecutionGraph archivedExecutionGraph = schedulerNG.requestJob();
+ final ExecutionGraphInfo executionGraphInfo = schedulerNG.requestJob();
scheduledExecutorService.execute(
- () ->
- jobCompletionActions.jobReachedGloballyTerminalState(
- archivedExecutionGraph));
+ () -> jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 05a67e6..f742eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -209,12 +209,12 @@ public interface JobMasterGateway
CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
/**
- * Requests the {@link ArchivedExecutionGraph} of the executed job.
+ * Requests the {@link ExecutionGraphInfo} of the executed job.
*
* @param timeout for the rpc call
- * @return Future which is completed with the {@link ArchivedExecutionGraph} of the executed job
+ * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job
*/
- CompletableFuture<ArchivedExecutionGraph> requestJob(@RpcTimeout Time timeout);
+ CompletableFuture<ExecutionGraphInfo> requestJob(@RpcTimeout Time timeout);
/**
* Triggers taking a savepoint of the executed job.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 41aad53..e1753bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -468,7 +468,7 @@ public class MiniCluster implements AutoCloseableAsync {
blobServer,
heartbeatServices,
metricRegistry,
- new MemoryArchivedExecutionGraphStore(),
+ new MemoryExecutionGraphInfoStore(),
metricQueryServiceRetriever,
fatalErrorHandler));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
similarity index 53%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
index f423e2e..f8d2aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
@@ -1,4 +1,3 @@
-package org.apache.flink.runtime.rest.handler.job;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,63 +16,59 @@ package org.apache.flink.runtime.rest.handler.job;
* limitations under the License.
*/
+package org.apache.flink.runtime.rest.handler.job;
+
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
-/** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
- implements JsonArchivist {
+/**
+ * {@code AbstractAccessExecutionGraphHandler} handles requests that require accessing the job's
+ * {@link AccessExecutionGraph}.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameter type
+ */
+public abstract class AbstractAccessExecutionGraphHandler<
+ R extends ResponseBody, M extends JobMessageParameters>
+ extends AbstractExecutionGraphHandler<R, M> {
- public JobPlanHandler(
+ protected AbstractAccessExecutionGraphHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
- Map<String, String> headers,
- MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
-
- super(leaderRetriever, timeout, headers, messageHeaders, executionGraphCache, executor);
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
}
@Override
- protected JobPlanInfo handleRequest(
- HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
- AccessExecutionGraph executionGraph)
+ protected R handleRequest(
+ HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
throws RestHandlerException {
- return createJobPlanInfo(executionGraph);
+ return handleRequest(request, executionGraphInfo.getArchivedExecutionGraph());
}
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
- throws IOException {
- ResponseBody json = createJobPlanInfo(graph);
- String path =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
- return Collections.singleton(new ArchivedJson(path, json));
- }
-
- private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) {
- return new JobPlanInfo(executionGraph.getJsonPlan());
- }
+ protected abstract R handleRequest(
+ HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+ throws RestHandlerException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 8c940ec..e8910a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
@@ -45,9 +45,10 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
- * Base class for all {@link AccessExecutionGraph} based REST handlers.
+ * Base class for all {@link ExecutionGraphInfo} based REST handlers.
*
* @param <R> response type
+ * @param <M> job message parameter type
*/
public abstract class AbstractExecutionGraphHandler<
R extends ResponseBody, M extends JobMessageParameters>
@@ -76,8 +77,8 @@ public abstract class AbstractExecutionGraphHandler<
throws RestHandlerException {
JobID jobId = request.getPathParameter(JobIDPathParameter.class);
- CompletableFuture<AccessExecutionGraph> executionGraphFuture =
- executionGraphCache.getExecutionGraph(jobId, gateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+ executionGraphCache.getExecutionGraphInfo(jobId, gateway);
return executionGraphFuture
.thenApplyAsync(
@@ -104,15 +105,15 @@ public abstract class AbstractExecutionGraphHandler<
}
/**
- * Called for each request after the corresponding {@link AccessExecutionGraph} has been
- * retrieved from the {@link ExecutionGraphCache}.
+ * Called for each request after the corresponding {@link ExecutionGraphInfo} has been retrieved
+ * from the {@link ExecutionGraphCache}.
*
* @param request for further information
- * @param executionGraph for which the handler was called
+ * @param executionGraphInfo for which the handler was called
* @return Response
* @throws RestHandlerException if the handler could not process the request
*/
protected abstract R handleRequest(
- HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+ HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
throws RestHandlerException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
index ee09d02..1e001ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
*/
public abstract class AbstractJobVertexHandler<
R extends ResponseBody, M extends JobVertexMessageParameters>
- extends AbstractExecutionGraphHandler<R, M> {
+ extends AbstractAccessExecutionGraphHandler<R, M> {
/**
* Instantiates a new Abstract job vertex handler.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 9c41953..70dd6ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -48,7 +48,8 @@ import java.util.concurrent.Executor;
/** Request handler that returns the aggregated accumulators of a job. */
public class JobAccumulatorsHandler
- extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<
+ JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
implements JsonArchivist {
public JobAccumulatorsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index cbdc8b6..53ee2b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
/** Handler serving the job configuration. */
public class JobConfigHandler
- extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
implements JsonArchivist {
public JobConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 9261674..84c6d55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
/** Handler returning the details for the specified job. */
public class JobDetailsHandler
- extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
implements JsonArchivist {
private final MetricFetcher metricFetcher;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 819f727..1fa496e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -49,7 +49,8 @@ import java.util.concurrent.Executor;
/** Handler serving the job exceptions. */
public class JobExceptionsHandler
- extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobExceptionsMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<
+ JobExceptionsInfo, JobExceptionsMessageParameters>
implements JsonArchivist {
static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index f423e2e..fcf48ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -40,7 +40,8 @@ import java.util.Map;
import java.util.concurrent.Executor;
/** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
+public class JobPlanHandler
+ extends AbstractAccessExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
implements JsonArchivist {
public JobPlanHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 1ab070f..12e8542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -53,7 +53,8 @@ import java.util.concurrent.Executor;
/** Request handler for the job vertex details. */
public class JobVertexDetailsHandler
- extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<
+ JobVertexDetailsInfo, JobVertexMessageParameters>
implements JsonArchivist {
private final MetricFetcher metricFetcher;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 0408fa1..1e38e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,8 @@ import java.util.concurrent.Executor;
* and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler
- extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<
+ JobVertexTaskManagersInfo, JobVertexMessageParameters>
implements JsonArchivist {
private MetricFetcher metricFetcher;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 9f2763c..1e2817e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -47,7 +47,7 @@ import java.util.concurrent.Executor;
*/
public abstract class AbstractCheckpointHandler<
R extends ResponseBody, M extends CheckpointMessageParameters>
- extends AbstractExecutionGraphHandler<R, M> {
+ extends AbstractAccessExecutionGraphHandler<R, M> {
private final CheckpointStatsCache checkpointStatsCache;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 597bcb5..7684271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
/** Handler which serves the checkpoint configuration. */
public class CheckpointConfigHandler
- extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
implements JsonArchivist {
public CheckpointConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index 320f7e0..db6f79c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
/** Handler which serves the checkpoint statistics. */
public class CheckpointingStatisticsHandler
- extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
+ extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
implements JsonArchivist {
public CheckpointingStatisticsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index 3ed7e4d..de40e14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -20,8 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.util.Preconditions;
@@ -63,12 +62,12 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
}
@Override
- public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+ public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
JobID jobId, RestfulGateway restfulGateway) {
return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity());
}
- private CompletableFuture<ArchivedExecutionGraph> getExecutionGraphInternal(
+ private CompletableFuture<ExecutionGraphInfo> getExecutionGraphInternal(
JobID jobId, RestfulGateway restfulGateway) {
Preconditions.checkState(running, "ExecutionGraphCache is no longer running");
@@ -78,10 +77,10 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
final long currentTime = System.currentTimeMillis();
if (oldEntry != null && currentTime < oldEntry.getTTL()) {
- final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
- oldEntry.getExecutionGraphFuture();
- if (!executionGraphFuture.isCompletedExceptionally()) {
- return executionGraphFuture;
+ final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+ oldEntry.getExecutionGraphInfoFuture();
+ if (!executionGraphInfoFuture.isCompletedExceptionally()) {
+ return executionGraphInfoFuture;
}
// otherwise it must be completed exceptionally
}
@@ -96,22 +95,23 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
} else {
successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
// cancel potentially outstanding futures
- oldEntry.getExecutionGraphFuture().cancel(false);
+ oldEntry.getExecutionGraphInfoFuture().cancel(false);
}
if (successfulUpdate) {
- final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
- restfulGateway.requestJob(jobId, timeout);
+ final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+ restfulGateway.requestExecutionGraphInfo(jobId, timeout);
- executionGraphFuture.whenComplete(
- (ArchivedExecutionGraph executionGraph, Throwable throwable) -> {
+ executionGraphInfoFuture.whenComplete(
+ (ExecutionGraphInfo executionGraph, Throwable throwable) -> {
if (throwable != null) {
- newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
+ newEntry.getExecutionGraphInfoFuture()
+ .completeExceptionally(throwable);
// remove exceptionally completed entry because it doesn't help
cachedExecutionGraphs.remove(jobId, newEntry);
} else {
- newEntry.getExecutionGraphFuture().complete(executionGraph);
+ newEntry.getExecutionGraphInfoFuture().complete(executionGraph);
}
});
@@ -120,7 +120,7 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
cachedExecutionGraphs.remove(jobId, newEntry);
}
- return newEntry.getExecutionGraphFuture();
+ return newEntry.getExecutionGraphInfoFuture();
}
}
}
@@ -139,19 +139,19 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
private static final class ExecutionGraphEntry {
private final long ttl;
- private final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture;
+ private final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture;
ExecutionGraphEntry(long ttl) {
this.ttl = ttl;
- this.executionGraphFuture = new CompletableFuture<>();
+ this.executionGraphInfoFuture = new CompletableFuture<>();
}
public long getTTL() {
return ttl;
}
- public CompletableFuture<ArchivedExecutionGraph> getExecutionGraphFuture() {
- return executionGraphFuture;
+ public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfoFuture() {
+ return executionGraphInfoFuture;
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 8309f27..b970399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -19,17 +19,16 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
/**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache
- * entry has an associated time to live after which a new request will trigger the reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Cache for {@link ExecutionGraphInfo} which are obtained from the Flink cluster. Every cache entry
+ * has an associated time to live after which a new request will trigger the reloading of the {@link
+ * ExecutionGraphInfo} from the cluster.
*/
public interface ExecutionGraphCache extends Closeable {
@@ -37,15 +36,15 @@ public interface ExecutionGraphCache extends Closeable {
int size();
/**
- * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The {@link
- * AccessExecutionGraph} will be requested again after the refresh interval has passed or if the
+ * Gets the {@link ExecutionGraphInfo} for the given {@link JobID} and caches it. The {@link
+ * ExecutionGraphInfo} will be requested again after the refresh interval has passed or if the
* graph could not be retrieved from the given gateway.
*
- * @param jobId identifying the {@link ArchivedExecutionGraph} to get
- * @param restfulGateway to request the {@link ArchivedExecutionGraph} from
- * @return Future containing the requested {@link ArchivedExecutionGraph}
+ * @param jobId identifying the {@link ExecutionGraphInfo} to get
+ * @param restfulGateway to request the {@link ExecutionGraphInfo} from
+ * @return Future containing the requested {@link ExecutionGraphInfo}
*/
- CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+ CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
JobID jobId, RestfulGateway restfulGateway);
/** Perform the cleanup of out dated cache entries. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
new file mode 100644
index 0000000..4b7b746
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExecutionGraphInfo} serves as a composite class that provides different {@link
+ * ExecutionGraph}-related information.
+ */
+public class ExecutionGraphInfo implements Serializable {
+
+ private static final long serialVersionUID = -6134203195124124202L;
+
+ private final ArchivedExecutionGraph executionGraph;
+ private final List<ErrorInfo> exceptionHistory;
+
+ public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
+ this(executionGraph, Collections.emptyList());
+ }
+
+ public ExecutionGraphInfo(
+ ArchivedExecutionGraph executionGraph, List<ErrorInfo> exceptionHistory) {
+ this.executionGraph = executionGraph;
+ this.exceptionHistory = exceptionHistory;
+ }
+
+ public JobID getJobId() {
+ return executionGraph.getJobID();
+ }
+
+ public ArchivedExecutionGraph getArchivedExecutionGraph() {
+ return executionGraph;
+ }
+
+ public List<ErrorInfo> getExceptionHistory() {
+ return exceptionHistory;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 64070ed..9b1e83d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -763,9 +763,10 @@ public abstract class SchedulerBase implements SchedulerNG {
}
@Override
- public ArchivedExecutionGraph requestJob() {
+ public ExecutionGraphInfo requestJob() {
mainThreadExecutor.assertRunningInMainThread();
- return ArchivedExecutionGraph.createFrom(executionGraph);
+ return new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createFrom(executionGraph), getExceptionHistory());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5db4c5d..c88c8c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -92,7 +91,7 @@ public interface SchedulerNG {
void notifyPartitionDataAvailable(ResultPartitionID partitionID);
- ArchivedExecutionGraph requestJob();
+ ExecutionGraphInfo requestJob();
JobStatus requestJobStatus();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index fdd1a5b..7a332cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerUtils;
@@ -368,8 +369,9 @@ public class AdaptiveScheduler
}
@Override
- public ArchivedExecutionGraph requestJob() {
- return state.getJob();
+ public ExecutionGraphInfo requestJob() {
+ // no exception history support is added for now (see FLINK-21439)
+ return new ExecutionGraphInfo(state.getJob());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 6a1d317..62b5f76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.SerializedValue;
import java.util.Collection;
@@ -67,7 +68,24 @@ public interface RestfulGateway extends RpcGateway {
* @return Future containing the {@link ArchivedExecutionGraph} for the given jobId, otherwise
* {@link FlinkJobNotFoundException}
*/
- CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
+ default CompletableFuture<ArchivedExecutionGraph> requestJob(
+ JobID jobId, @RpcTimeout Time timeout) {
+ return requestExecutionGraphInfo(jobId, timeout)
+ .thenApply(ExecutionGraphInfo::getArchivedExecutionGraph);
+ }
+
+ /**
+ * Requests the {@link ExecutionGraphInfo} containing additional information besides the {@link
+ * ArchivedExecutionGraph}. If there is no such graph, then the future is completed with a
+ * {@link FlinkJobNotFoundException}.
+ *
+ * @param jobId identifying the job whose {@link ExecutionGraphInfo} is requested
+ * @param timeout for the asynchronous operation
+ * @return Future containing the {@link ExecutionGraphInfo} for the given jobId, otherwise
+ * {@link FlinkJobNotFoundException}
+ */
+ CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+ JobID jobId, @RpcTimeout Time timeout);
/**
* Requests the {@link JobResult} of a job specified by the given jobId.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
index a2edd76..e1da5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.history;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import java.io.IOException;
import java.util.Collection;
@@ -43,4 +44,23 @@ public interface JsonArchivist {
* @throws IOException thrown if the JSON generation fails
*/
Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+
+ /**
+ * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
+ * respective REST URL for a given job.
+ *
+ * <p>The collection should contain one entry for every response that could be generated for the
+ * given job, for example one entry for each task. The REST URLs should be unique and must not
+ * contain placeholders.
+ *
+ * @param executionGraphInfo {@link AccessExecutionGraph}-related information for which the
+ * responses should be generated
+ * @return Collection containing an ArchivedJson for every response that could be generated for
+ * the given job
+ * @throws IOException thrown if the JSON generation fails
+ */
+ default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
+ throws IOException {
+ return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
index 9738809..66b8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -90,7 +91,9 @@ public class DispatcherJobTest extends TestLogger {
// assert result future done
DispatcherJobResult result = dispatcherJob.getResultFuture().get();
- assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FINISHED));
+ assertThat(
+ result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+ is(JobStatus.FINISHED));
}
@Test
@@ -116,7 +119,12 @@ public class DispatcherJobTest extends TestLogger {
assertThat(dispatcherJob.isInitialized(), is(true));
// assert that the result future completes
assertThat(
- dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+ dispatcherJob
+ .getResultFuture()
+ .get()
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState(),
is(JobStatus.CANCELED));
}
@@ -134,7 +142,12 @@ public class DispatcherJobTest extends TestLogger {
cancelFuture.get();
assertJobStatus(dispatcherJob, JobStatus.CANCELED);
assertThat(
- dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+ dispatcherJob
+ .getResultFuture()
+ .get()
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState(),
is(JobStatus.CANCELED));
}
@@ -171,7 +184,11 @@ public class DispatcherJobTest extends TestLogger {
assertJobStatus(dispatcherJob, JobStatus.FAILED);
ArchivedExecutionGraph aeg =
- dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+ dispatcherJob
+ .getResultFuture()
+ .get()
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph();
assertThat(
aeg.getFailureInfo()
.getException()
@@ -188,7 +205,9 @@ public class DispatcherJobTest extends TestLogger {
DispatcherJobResult result = dispatcherJob.getResultFuture().get();
assertThat(result.isInitializationFailure(), is(true));
- assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FAILED));
+ assertThat(
+ result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+ is(JobStatus.FAILED));
assertThat(
result.getInitializationFailure().getMessage(),
containsString("Artificial failure"));
@@ -298,13 +317,14 @@ public class DispatcherJobTest extends TestLogger {
.setRequestJobSupplier(
() ->
CompletableFuture.completedFuture(
- ArchivedExecutionGraph
- .createFromInitializingJob(
- getJobID(),
- "test",
- internalJobStatus,
- null,
- 1337)))
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph
+ .createFromInitializingJob(
+ getJobID(),
+ "test",
+ internalJobStatus,
+ null,
+ 1337))))
.setRequestJobDetailsSupplier(
() -> {
JobDetails jobDetails =
@@ -359,8 +379,9 @@ public class DispatcherJobTest extends TestLogger {
internalJobStatus = JobStatus.FINISHED;
resultFuture.complete(
JobManagerRunnerResult.forSuccess(
- ArchivedExecutionGraph.createFromInitializingJob(
- getJobID(), "test", JobStatus.FINISHED, null, 1337)));
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createFromInitializingJob(
+ getJobID(), "test", JobStatus.FINISHED, null, 1337))));
}
public void finishCancellation() {
@@ -370,12 +391,14 @@ public class DispatcherJobTest extends TestLogger {
runner.getResultFuture()
.complete(
JobManagerRunnerResult.forSuccess(
- ArchivedExecutionGraph.createFromInitializingJob(
- getJobID(),
- "test",
- JobStatus.CANCELED,
- null,
- 1337)));
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph
+ .createFromInitializingJob(
+ getJobID(),
+ "test",
+ JobStatus.CANCELED,
+ null,
+ 1337))));
cancellationFuture.complete(Acknowledge.get());
});
}
@@ -384,7 +407,9 @@ public class DispatcherJobTest extends TestLogger {
private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus)
throws Exception {
assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus));
- assertThat(dispatcherJob.requestJob(TIMEOUT).get().getState(), is(expectedStatus));
+ assertThat(
+ dispatcherJob.requestJob(TIMEOUT).get().getArchivedExecutionGraph().getState(),
+ is(expectedStatus));
assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index fb02a7b..caa1574 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -196,8 +197,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
- final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
- new MemoryArchivedExecutionGraphStore();
+ final MemoryExecutionGraphInfoStore archivedExecutionGraphStore =
+ new MemoryExecutionGraphInfoStore();
dispatcher =
new TestingDispatcher(
rpcService,
@@ -349,10 +350,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
testingJobManagerRunner.getCloseAsyncCalledLatch().await();
testingJobManagerRunner.completeResultFuture(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobId)
- .setState(JobStatus.FINISHED)
- .build());
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.FINISHED)
+ .build()));
testingJobManagerRunner.completeTerminationFuture();
@@ -378,10 +380,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
testingJobManagerRunner.completeResultFuture(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .setJobID(jobId)
- .build());
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setState(JobStatus.FINISHED)
+ .setJobID(jobId)
+ .build()));
// wait for the clearing
clearedJobLatch.await();
@@ -459,10 +462,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
takeCreatedJobManagerRunner.completeResultFuture(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobId)
- .setState(JobStatus.FINISHED)
- .build());
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.FINISHED)
+ .build()));
}
private void assertThatHABlobsHaveNotBeenRemoved() {
@@ -578,7 +582,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(executionGraph);
+ testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
assertThat(cleanupJobFuture.get(), equalTo(jobId));
assertThat(deleteAllHABlobsFuture.isDone(), is(false));
@@ -597,7 +601,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(executionGraph);
+ testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
assertThat(cleanupJobFuture.get(), equalTo(jobId));
assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 11e799e..b4c2cd8 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
@@ -264,8 +265,8 @@ public class DispatcherTest extends TestLogger {
TestingResourceManagerGateway resourceManagerGateway =
new TestingResourceManagerGateway();
- final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
- new MemoryArchivedExecutionGraphStore();
+ final MemoryExecutionGraphInfoStore executionGraphInfoStore =
+ new MemoryExecutionGraphInfoStore();
return new TestingDispatcher(
rpcService,
@@ -278,7 +279,7 @@ public class DispatcherTest extends TestLogger {
() -> CompletableFuture.completedFuture(resourceManagerGateway),
blobServer,
heartbeatServices,
- archivedExecutionGraphStore,
+ executionGraphInfoStore,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
VoidHistoryServerArchivist.INSTANCE,
null,
@@ -530,21 +531,23 @@ public class DispatcherTest extends TestLogger {
final JobID failedJobId = new JobID();
final JobStatus expectedState = JobStatus.FAILED;
- final ArchivedExecutionGraph failedExecutionGraph =
- new ArchivedExecutionGraphBuilder()
- .setJobID(failedJobId)
- .setState(expectedState)
- .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L))
- .build();
+ final ExecutionGraphInfo failedExecutionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(failedJobId)
+ .setState(expectedState)
+ .setFailureCause(
+ new ErrorInfo(new RuntimeException("expected"), 1L))
+ .build());
- dispatcher.completeJobExecution(failedExecutionGraph);
+ dispatcher.completeJobExecution(failedExecutionGraphInfo);
assertThat(
dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
equalTo(expectedState));
assertThat(
- dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(),
- equalTo(failedExecutionGraph));
+ dispatcherGateway.requestExecutionGraphInfo(failedJobId, TIMEOUT).get(),
+ equalTo(failedExecutionGraphInfo));
}
@Test
@@ -970,13 +973,14 @@ public class DispatcherTest extends TestLogger {
.setRequestJobSupplier(
() ->
CompletableFuture.completedFuture(
- ArchivedExecutionGraph
- .createFromInitializingJob(
- jobGraph.getJobID(),
- jobGraph.getName(),
- JobStatus.RUNNING,
- null,
- 1337)))
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph
+ .createFromInitializingJob(
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ JobStatus.RUNNING,
+ null,
+ 1337))))
.build();
testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
return testingRunner;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
similarity index 50%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
index 42e0325..cfd76a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.util.Preconditions;
@@ -57,8 +58,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-/** Tests for the {@link FileArchivedExecutionGraphStore}. */
-public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+/** Tests for the {@link FileExecutionGraphInfoStore}. */
+public class FileExecutionGraphInfoStoreTest extends TestLogger {
private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS =
Arrays.stream(JobStatus.values())
@@ -68,31 +69,32 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * Tests that we can put {@link ArchivedExecutionGraph} into the {@link
- * FileArchivedExecutionGraphStore} and that the graph is persisted.
+ * Tests that we can put {@link ExecutionGraphInfo} into the {@link FileExecutionGraphInfoStore}
+ * and that the graph is persisted.
*/
@Test
public void testPut() throws IOException {
- final ArchivedExecutionGraph dummyExecutionGraph =
- new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+ final ExecutionGraphInfo dummyExecutionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
final File rootDir = temporaryFolder.newFolder();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- createDefaultExecutionGraphStore(rootDir)) {
+ try (final FileExecutionGraphInfoStore executionGraphStore =
+ createDefaultExecutionGraphInfoStore(rootDir)) {
final File storageDirectory = executionGraphStore.getStorageDir();
// check that the storage directory is empty
assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
- executionGraphStore.put(dummyExecutionGraph);
+ executionGraphStore.put(dummyExecutionGraphInfo);
// check that we have persisted the given execution graph
assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
assertThat(
- executionGraphStore.get(dummyExecutionGraph.getJobID()),
- new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+ executionGraphStore.get(dummyExecutionGraphInfo.getJobId()),
+ new PartialExecutionGraphInfoMatcher(dummyExecutionGraphInfo));
}
}
@@ -101,8 +103,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
public void testUnknownGet() throws IOException {
final File rootDir = temporaryFolder.newFolder();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- createDefaultExecutionGraphStore(rootDir)) {
+ try (final FileExecutionGraphInfoStore executionGraphStore =
+ createDefaultExecutionGraphInfoStore(rootDir)) {
assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
}
}
@@ -111,11 +113,12 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
@Test
public void testStoredJobsOverview() throws IOException {
final int numberExecutionGraphs = 10;
- final Collection<ArchivedExecutionGraph> executionGraphs =
- generateTerminalExecutionGraphs(numberExecutionGraphs);
+ final Collection<ExecutionGraphInfo> executionGraphInfos =
+ generateTerminalExecutionGraphInfos(numberExecutionGraphs);
final List<JobStatus> jobStatuses =
- executionGraphs.stream()
+ executionGraphInfos.stream()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(ArchivedExecutionGraph::getState)
.collect(Collectors.toList());
@@ -123,14 +126,14 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
final File rootDir = temporaryFolder.newFolder();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- createDefaultExecutionGraphStore(rootDir)) {
- for (ArchivedExecutionGraph executionGraph : executionGraphs) {
- executionGraphStore.put(executionGraph);
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ createDefaultExecutionGraphInfoStore(rootDir)) {
+ for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+ executionGraphInfoStore.put(executionGraphInfo);
}
assertThat(
- executionGraphStore.getStoredJobsOverview(),
+ executionGraphInfoStore.getStoredJobsOverview(),
Matchers.equalTo(expectedJobsOverview));
}
}
@@ -139,21 +142,21 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
@Test
public void testAvailableJobDetails() throws IOException {
final int numberExecutionGraphs = 10;
- final Collection<ArchivedExecutionGraph> executionGraphs =
- generateTerminalExecutionGraphs(numberExecutionGraphs);
+ final Collection<ExecutionGraphInfo> executionGraphInfos =
+ generateTerminalExecutionGraphInfos(numberExecutionGraphs);
- final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphs);
+ final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphInfos);
final File rootDir = temporaryFolder.newFolder();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- createDefaultExecutionGraphStore(rootDir)) {
- for (ArchivedExecutionGraph executionGraph : executionGraphs) {
- executionGraphStore.put(executionGraph);
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ createDefaultExecutionGraphInfoStore(rootDir)) {
+ for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+ executionGraphInfoStore.put(executionGraphInfo);
}
assertThat(
- executionGraphStore.getAvailableJobDetails(),
+ executionGraphInfoStore.getAvailableJobDetails(),
Matchers.containsInAnyOrder(jobDetails.toArray()));
}
}
@@ -170,8 +173,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
final ManualTicker manualTicker = new ManualTicker();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- new FileArchivedExecutionGraphStore(
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ new FileExecutionGraphInfoStore(
rootDir,
expirationTime,
Integer.MAX_VALUE,
@@ -179,24 +182,29 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
scheduledExecutor,
manualTicker)) {
- final ArchivedExecutionGraph executionGraph =
- new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setState(JobStatus.FINISHED)
+ .build());
- executionGraphStore.put(executionGraph);
+ executionGraphInfoStore.put(executionGraphInfo);
// there should one execution graph
- assertThat(executionGraphStore.size(), Matchers.equalTo(1));
+ assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS);
// this should trigger the cleanup after expiration
scheduledExecutor.triggerScheduledTasks();
- assertThat(executionGraphStore.size(), Matchers.equalTo(0));
+ assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
- assertThat(executionGraphStore.get(executionGraph.getJobID()), Matchers.nullValue());
+ assertThat(
+ executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+ Matchers.nullValue());
- final File storageDirectory = executionGraphStore.getStorageDir();
+ final File storageDirectory = executionGraphInfoStore.getStorageDir();
// check that the persisted file has been deleted
assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
@@ -210,17 +218,20 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- createDefaultExecutionGraphStore(rootDir)) {
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ createDefaultExecutionGraphInfoStore(rootDir)) {
assertThat(rootDir.listFiles().length, Matchers.equalTo(1));
- final File storageDirectory = executionGraphStore.getStorageDir();
+ final File storageDirectory = executionGraphInfoStore.getStorageDir();
assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
- executionGraphStore.put(
- new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
+ executionGraphInfoStore.put(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setState(JobStatus.FINISHED)
+ .build()));
assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
}
@@ -228,13 +239,13 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
}
- /** Tests that evicted {@link ArchivedExecutionGraph} are loaded from disk again. */
+ /** Tests that evicted {@link ExecutionGraphInfo} are loaded from disk again. */
@Test
public void testCacheLoading() throws IOException {
final File rootDir = temporaryFolder.newFolder();
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- new FileArchivedExecutionGraphStore(
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ new FileExecutionGraphInfoStore(
rootDir,
Time.hours(1L),
Integer.MAX_VALUE,
@@ -242,43 +253,47 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
TestingUtils.defaultScheduledExecutor(),
Ticker.systemTicker())) {
- final LoadingCache<JobID, ArchivedExecutionGraph> executionGraphCache =
- executionGraphStore.getArchivedExecutionGraphCache();
+ final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache =
+ executionGraphInfoStore.getExecutionGraphInfoCache();
- Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(64);
+ Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(64);
boolean continueInserting = true;
// insert execution graphs until the first one got evicted
while (continueInserting) {
// has roughly a size of 1.4 KB
- final ArchivedExecutionGraph executionGraph =
- new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setState(JobStatus.FINISHED)
+ .build());
- executionGraphStore.put(executionGraph);
+ executionGraphInfoStore.put(executionGraphInfo);
- executionGraphs.add(executionGraph);
+ executionGraphInfos.add(executionGraphInfo);
- continueInserting = executionGraphCache.size() == executionGraphs.size();
+ continueInserting = executionGraphInfoCache.size() == executionGraphInfos.size();
}
- final File storageDirectory = executionGraphStore.getStorageDir();
+ final File storageDirectory = executionGraphInfoStore.getStorageDir();
assertThat(
- storageDirectory.listFiles().length, Matchers.equalTo(executionGraphs.size()));
+ storageDirectory.listFiles().length,
+ Matchers.equalTo(executionGraphInfos.size()));
- for (ArchivedExecutionGraph executionGraph : executionGraphs) {
+ for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
assertThat(
- executionGraphStore.get(executionGraph.getJobID()),
- matchesPartiallyWith(executionGraph));
+ executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+ matchesPartiallyWith(executionGraphInfo));
}
}
}
/**
- * Tests that the size of {@link FileArchivedExecutionGraphStore} is no more than the configured
- * max capacity and the old execution graphs will be purged if the total added number exceeds
- * the max capacity.
+ * Tests that the size of {@link FileExecutionGraphInfoStore} is no more than the configured max
+ * capacity and the old execution graphs will be purged if the total added number exceeds the
+ * max capacity.
*/
@Test
public void testMaximumCapacity() throws IOException {
@@ -287,15 +302,15 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
final int maxCapacity = 10;
final int numberExecutionGraphs = 10;
- final Collection<ArchivedExecutionGraph> oldExecutionGraphs =
- generateTerminalExecutionGraphs(numberExecutionGraphs);
- final Collection<ArchivedExecutionGraph> newExecutionGraphs =
- generateTerminalExecutionGraphs(numberExecutionGraphs);
+ final Collection<ExecutionGraphInfo> oldExecutionGraphInfos =
+ generateTerminalExecutionGraphInfos(numberExecutionGraphs);
+ final Collection<ExecutionGraphInfo> newExecutionGraphInfos =
+ generateTerminalExecutionGraphInfos(numberExecutionGraphs);
- final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphs);
+ final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphInfos);
- try (final FileArchivedExecutionGraphStore executionGraphStore =
- new FileArchivedExecutionGraphStore(
+ try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+ new FileExecutionGraphInfoStore(
rootDir,
Time.hours(1L),
maxCapacity,
@@ -303,42 +318,44 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
TestingUtils.defaultScheduledExecutor(),
Ticker.systemTicker())) {
- for (ArchivedExecutionGraph executionGraph : oldExecutionGraphs) {
- executionGraphStore.put(executionGraph);
+ for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
+ executionGraphInfoStore.put(executionGraphInfo);
// no more than the configured maximum capacity
- assertTrue(executionGraphStore.size() <= maxCapacity);
+ assertTrue(executionGraphInfoStore.size() <= maxCapacity);
}
- for (ArchivedExecutionGraph executionGraph : newExecutionGraphs) {
- executionGraphStore.put(executionGraph);
+ for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
+ executionGraphInfoStore.put(executionGraphInfo);
// equals to the configured maximum capacity
- assertEquals(maxCapacity, executionGraphStore.size());
+ assertEquals(maxCapacity, executionGraphInfoStore.size());
}
// the older execution graphs are purged
assertThat(
- executionGraphStore.getAvailableJobDetails(),
+ executionGraphInfoStore.getAvailableJobDetails(),
Matchers.containsInAnyOrder(jobDetails.toArray()));
}
}
- private Collection<ArchivedExecutionGraph> generateTerminalExecutionGraphs(int number) {
- final Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(number);
+ private Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos(int number) {
+ final Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
final JobStatus state =
GLOBALLY_TERMINAL_JOB_STATUS.get(
ThreadLocalRandom.current()
.nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size()));
- executionGraphs.add(new ArchivedExecutionGraphBuilder().setState(state).build());
+ executionGraphInfos.add(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().setState(state).build()));
}
- return executionGraphs;
+ return executionGraphInfos;
}
- private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File storageDirectory)
+ private FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore(File storageDirectory)
throws IOException {
- return new FileArchivedExecutionGraphStore(
+ return new FileExecutionGraphInfoStore(
storageDirectory,
Time.hours(1L),
Integer.MAX_VALUE,
@@ -347,56 +364,65 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
Ticker.systemTicker());
}
- private static final class PartialArchivedExecutionGraphMatcher
- extends BaseMatcher<ArchivedExecutionGraph> {
+ private static final class PartialExecutionGraphInfoMatcher
+ extends BaseMatcher<ExecutionGraphInfo> {
- private final ArchivedExecutionGraph archivedExecutionGraph;
+ private final ExecutionGraphInfo expectedExecutionGraphInfo;
- private PartialArchivedExecutionGraphMatcher(
- ArchivedExecutionGraph expectedArchivedExecutionGraph) {
- this.archivedExecutionGraph =
- Preconditions.checkNotNull(expectedArchivedExecutionGraph);
+ private PartialExecutionGraphInfoMatcher(ExecutionGraphInfo expectedExecutionGraphInfo) {
+ this.expectedExecutionGraphInfo =
+ Preconditions.checkNotNull(expectedExecutionGraphInfo);
}
@Override
public boolean matches(Object o) {
- if (archivedExecutionGraph == o) {
+ if (expectedExecutionGraphInfo == o) {
return true;
}
- if (o == null || archivedExecutionGraph.getClass() != o.getClass()) {
+ if (o == null || expectedExecutionGraphInfo.getClass() != o.getClass()) {
return false;
}
- ArchivedExecutionGraph that = (ArchivedExecutionGraph) o;
- return archivedExecutionGraph.isStoppable() == that.isStoppable()
- && Objects.equals(archivedExecutionGraph.getJobID(), that.getJobID())
- && Objects.equals(archivedExecutionGraph.getJobName(), that.getJobName())
- && archivedExecutionGraph.getState() == that.getState()
- && Objects.equals(archivedExecutionGraph.getJsonPlan(), that.getJsonPlan())
+ ExecutionGraphInfo that = (ExecutionGraphInfo) o;
+
+ ArchivedExecutionGraph thisExecutionGraph =
+ expectedExecutionGraphInfo.getArchivedExecutionGraph();
+ ArchivedExecutionGraph thatExecutionGraph = that.getArchivedExecutionGraph();
+ return thisExecutionGraph.isStoppable() == thatExecutionGraph.isStoppable()
+ && Objects.equals(thisExecutionGraph.getJobID(), thatExecutionGraph.getJobID())
+ && Objects.equals(
+ thisExecutionGraph.getJobName(), thatExecutionGraph.getJobName())
+ && thisExecutionGraph.getState() == thatExecutionGraph.getState()
+ && Objects.equals(
+ thisExecutionGraph.getJsonPlan(), thatExecutionGraph.getJsonPlan())
+ && Objects.equals(
+ thisExecutionGraph.getAccumulatorsSerialized(),
+ thatExecutionGraph.getAccumulatorsSerialized())
&& Objects.equals(
- archivedExecutionGraph.getAccumulatorsSerialized(),
- that.getAccumulatorsSerialized())
+ thisExecutionGraph.getCheckpointCoordinatorConfiguration(),
+ thatExecutionGraph.getCheckpointCoordinatorConfiguration())
+ && thisExecutionGraph.getAllVertices().size()
+ == thatExecutionGraph.getAllVertices().size()
&& Objects.equals(
- archivedExecutionGraph.getCheckpointCoordinatorConfiguration(),
- that.getCheckpointCoordinatorConfiguration())
- && archivedExecutionGraph.getAllVertices().size()
- == that.getAllVertices().size();
+ expectedExecutionGraphInfo.getExceptionHistory(),
+ that.getExceptionHistory());
}
@Override
public void describeTo(Description description) {
description.appendText(
- "Matches against " + ArchivedExecutionGraph.class.getSimpleName() + '.');
+ "Matches against " + ExecutionGraphInfo.class.getSimpleName() + '.');
}
}
- private static Matcher<ArchivedExecutionGraph> matchesPartiallyWith(
- ArchivedExecutionGraph executionGraph) {
- return new PartialArchivedExecutionGraphMatcher(executionGraph);
+ private static Matcher<ExecutionGraphInfo> matchesPartiallyWith(
+ ExecutionGraphInfo executionGraphInfo) {
+ return new PartialExecutionGraphInfoMatcher(executionGraphInfo);
}
private static Collection<JobDetails> generateJobDetails(
- Collection<ArchivedExecutionGraph> executionGraphs) {
- return executionGraphs.stream()
+ Collection<ExecutionGraphInfo> executionGraphInfos) {
+ return executionGraphInfos.stream()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index e496863..8250d8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
@@ -38,6 +37,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.TestLogger;
@@ -74,7 +74,7 @@ public class MiniDispatcherTest extends TestLogger {
private static JobGraph jobGraph;
- private static ArchivedExecutionGraph archivedExecutionGraph;
+ private static ExecutionGraphInfo executionGraphInfo;
private static TestingRpcService rpcService;
@@ -87,8 +87,8 @@ public class MiniDispatcherTest extends TestLogger {
private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
- private final ArchivedExecutionGraphStore archivedExecutionGraphStore =
- new MemoryArchivedExecutionGraphStore();
+ private final ExecutionGraphInfoStore executionGraphInfoStore =
+ new MemoryExecutionGraphInfoStore();
private TestingHighAvailabilityServices highAvailabilityServices;
@@ -98,11 +98,12 @@ public class MiniDispatcherTest extends TestLogger {
public static void setupClass() throws IOException {
jobGraph = new JobGraph();
- archivedExecutionGraph =
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.FINISHED)
- .build();
+ executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.FINISHED)
+ .build());
rpcService = new TestingRpcService();
configuration = new Configuration();
@@ -166,7 +167,7 @@ public class MiniDispatcherTest extends TestLogger {
final TestingJobManagerRunner testingJobManagerRunner =
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+ testingJobManagerRunner.completeResultFuture(executionGraphInfo);
// wait until we terminate
miniDispatcher.getShutDownFuture().get();
@@ -192,7 +193,7 @@ public class MiniDispatcherTest extends TestLogger {
final TestingJobManagerRunner testingJobManagerRunner =
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
- testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+ testingJobManagerRunner.completeResultFuture(executionGraphInfo);
assertFalse(miniDispatcher.getTerminationFuture().isDone());
@@ -228,10 +229,11 @@ public class MiniDispatcherTest extends TestLogger {
dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L));
testingJobManagerRunner.completeResultFuture(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.CANCELED)
- .build());
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.CANCELED)
+ .build()));
ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
@@ -256,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
() -> CompletableFuture.completedFuture(resourceManagerGateway),
blobServer,
heartbeatServices,
- archivedExecutionGraphStore,
+ executionGraphInfoStore,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
VoidHistoryServerArchivist.INSTANCE,
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 0e53e0f..a774709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import javax.annotation.Nonnull;
@@ -64,8 +64,8 @@ class TestingDispatcher extends Dispatcher {
startFuture.complete(null);
}
- void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
- runAsync(() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+ void completeJobExecution(ExecutionGraphInfo executionGraphInfo) {
+ runAsync(() -> jobReachedGloballyTerminalState(executionGraphInfo));
}
CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index cbd2280..bad26fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -116,7 +116,7 @@ public class DefaultDispatcherRunnerITCase extends TestLogger {
blobServerResource.getBlobServer(),
new TestingHeartbeatServices(),
UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
- new MemoryArchivedExecutionGraphStore(),
+ new MemoryExecutionGraphInfoStore(),
fatalErrorHandler,
VoidHistoryServerArchivist.INSTANCE,
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index d38c365..e84e746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
@@ -161,7 +161,7 @@ public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger {
blobServer,
new TestingHeartbeatServices(),
UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
- new MemoryArchivedExecutionGraphStore(),
+ new MemoryExecutionGraphInfoStore(),
fatalErrorHandler,
VoidHistoryServerArchivist.INSTANCE,
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 9328271..12b5b97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class ClusterEntrypointTest {
}
@Override
- protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor)
throws IOException {
throw new UnsupportedOperationException("Not needed for this test");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
index 275d799..0b2c9cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFacto
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -67,7 +67,7 @@ public class JobManagerRunnerImplTest extends TestLogger {
private static JobGraph jobGraph;
- private static ArchivedExecutionGraph archivedExecutionGraph;
+ private static ExecutionGraphInfo executionGraphInfo;
private static JobMasterServiceFactory defaultJobMasterServiceFactory;
@@ -85,11 +85,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
jobVertex.setInvokableClass(NoOpInvokable.class);
jobGraph = new JobGraph(jobVertex);
- archivedExecutionGraph =
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.FINISHED)
- .build();
+ executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.FINISHED)
+ .build());
}
@Before
@@ -120,12 +121,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
assertThat(resultFuture.isDone(), is(false));
- jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph);
+ jobManagerRunner.jobReachedGloballyTerminalState(executionGraphInfo);
final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
assertThat(
jobManagerRunnerResult,
- is(JobManagerRunnerResult.forSuccess(archivedExecutionGraph)));
+ is(JobManagerRunnerResult.forSuccess(executionGraphInfo)));
} finally {
jobManagerRunner.close();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
index ef170cb..d90d753 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -33,14 +33,14 @@ import static org.junit.Assert.assertTrue;
/** Tests for the {@link JobManagerRunnerResult}. */
public class JobManagerRunnerResultTest extends TestLogger {
- private final ArchivedExecutionGraph archivedExecutionGraph =
- new ArchivedExecutionGraphBuilder().build();
+ private final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
private final FlinkException testException = new FlinkException("test exception");
@Test
public void testSuccessfulJobManagerResult() {
final JobManagerRunnerResult jobManagerRunnerResult =
- JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+ JobManagerRunnerResult.forSuccess(executionGraphInfo);
assertTrue(jobManagerRunnerResult.isSuccess());
assertFalse(jobManagerRunnerResult.isJobNotFinished());
@@ -70,9 +70,9 @@ public class JobManagerRunnerResultTest extends TestLogger {
@Test
public void testGetArchivedExecutionGraphFromSuccessfulJobManagerResult() {
final JobManagerRunnerResult jobManagerRunnerResult =
- JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+ JobManagerRunnerResult.forSuccess(executionGraphInfo);
- assertThat(jobManagerRunnerResult.getArchivedExecutionGraph(), is(archivedExecutionGraph));
+ assertThat(jobManagerRunnerResult.getExecutionGraphInfo(), is(executionGraphInfo));
}
@Test(expected = IllegalStateException.class)
@@ -80,7 +80,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
final JobManagerRunnerResult jobManagerRunnerResult =
JobManagerRunnerResult.forJobNotFinished();
- jobManagerRunnerResult.getArchivedExecutionGraph();
+ jobManagerRunnerResult.getExecutionGraphInfo();
}
@Test(expected = IllegalStateException.class)
@@ -88,7 +88,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
final JobManagerRunnerResult jobManagerRunnerResult =
JobManagerRunnerResult.forInitializationFailure(testException);
- jobManagerRunnerResult.getArchivedExecutionGraph();
+ jobManagerRunnerResult.getExecutionGraphInfo();
}
@Test
@@ -110,7 +110,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
@Test(expected = IllegalStateException.class)
public void testGetInitializationFailureFromSuccessfulJobManagerResult() {
final JobManagerRunnerResult jobManagerRunnerResult =
- JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+ JobManagerRunnerResult.forSuccess(executionGraphInfo);
jobManagerRunnerResult.getInitializationFailure();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
index 6346172..8b57cfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
@@ -133,7 +133,11 @@ public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
assertThat(deploymentTrackerWrapper.getStopFuture().get(), is(deployedExecution));
assertThat(
- onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getState(),
+ onCompletionActions
+ .getJobReachedGloballyTerminalStateFuture()
+ .get()
+ .getArchivedExecutionGraph()
+ .getState(),
is(JobStatus.FAILED));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index bf78cea..ca04421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -105,6 +105,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
@@ -1199,7 +1200,7 @@ public class JobMasterTest extends TestLogger {
private static Collection<AccessExecution> getExecutions(
final JobMasterGateway jobMasterGateway) {
final ArchivedExecutionGraph archivedExecutionGraph =
- requestExecutionGraph(jobMasterGateway);
+ requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
return archivedExecutionGraph.getAllVertices().values().stream()
.flatMap(vertex -> Arrays.stream(vertex.getTaskVertices()))
@@ -1210,7 +1211,7 @@ public class JobMasterTest extends TestLogger {
private static List<AccessExecution> getExecutions(
final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) {
final ArchivedExecutionGraph archivedExecutionGraph =
- requestExecutionGraph(jobMasterGateway);
+ requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
return Optional.ofNullable(archivedExecutionGraph.getAllVertices().get(jobVertexId))
.map(
@@ -1221,7 +1222,7 @@ public class JobMasterTest extends TestLogger {
.collect(Collectors.toList());
}
- private static ArchivedExecutionGraph requestExecutionGraph(
+ private static ExecutionGraphInfo requestExecutionGraph(
final JobMasterGateway jobMasterGateway) {
try {
return jobMasterGateway.requestJob(testingTimeout).get();
@@ -1821,7 +1822,10 @@ public class JobMasterTest extends TestLogger {
jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway);
final ArchivedExecutionGraph archivedExecutionGraph =
- onCompletionActions.getJobReachedGloballyTerminalStateFuture().get();
+ onCompletionActions
+ .getJobReachedGloballyTerminalStateFuture()
+ .get()
+ .getArchivedExecutionGraph();
assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
} finally {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 4412d72..1d80e17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
@@ -88,8 +88,8 @@ public class TestingJobManagerRunner implements JobManagerRunner {
return closeAsyncCalledLatch;
}
- public void completeResultFuture(ArchivedExecutionGraph archivedExecutionGraph) {
- resultFuture.complete(JobManagerRunnerResult.forSuccess(archivedExecutionGraph));
+ public void completeResultFuture(ExecutionGraphInfo executionGraphInfo) {
+ resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
}
public void completeResultFutureExceptionally(Exception e) {
@@ -108,6 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
this.jobMasterGatewayFuture.complete(testingJobMasterGateway);
}
+ /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */
public static class Builder {
private JobID jobId = null;
private boolean blockingTermination = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 77de770..d8f2678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMet
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -207,15 +207,15 @@ public class JobMasterBuilder {
*/
public static final class TestingOnCompletionActions implements OnCompletionActions {
- private final CompletableFuture<ArchivedExecutionGraph>
- jobReachedGloballyTerminalStateFuture = new CompletableFuture<>();
+ private final CompletableFuture<ExecutionGraphInfo> jobReachedGloballyTerminalStateFuture =
+ new CompletableFuture<>();
private final CompletableFuture<Void> jobFinishedByOtherFuture = new CompletableFuture<>();
private final CompletableFuture<Throwable> jobMasterFailedFuture =
new CompletableFuture<>();
@Override
- public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
- jobReachedGloballyTerminalStateFuture.complete(executionGraph);
+ public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
+ jobReachedGloballyTerminalStateFuture.complete(executionGraphInfo);
}
@Override
@@ -228,12 +228,11 @@ public class JobMasterBuilder {
jobMasterFailedFuture.complete(cause);
}
- public CompletableFuture<ArchivedExecutionGraph>
- getJobReachedGloballyTerminalStateFuture() {
+ public CompletableFuture<ExecutionGraphInfo> getJobReachedGloballyTerminalStateFuture() {
return jobReachedGloballyTerminalStateFuture;
}
- public CompletableFuture<ArchivedExecutionGraph> getJobFinishedByOtherFuture() {
+ public CompletableFuture<ExecutionGraphInfo> getJobFinishedByOtherFuture() {
return jobReachedGloballyTerminalStateFuture;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index a6b4155..0464d51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -123,7 +123,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
@Nonnull private final Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier;
- @Nonnull private final Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier;
+ @Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier;
@Nonnull
private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction;
@@ -221,7 +221,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
taskManagerHeartbeatConsumer,
@Nonnull Consumer<ResourceID> resourceManagerHeartbeatConsumer,
@Nonnull Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier,
- @Nonnull Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier,
+ @Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier,
@Nonnull
BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction,
@Nonnull
@@ -388,7 +388,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
}
@Override
- public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+ public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
return requestJobSupplier.get();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index c2dc9ab..e957311 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -112,7 +112,7 @@ public class TestingJobMasterGatewayBuilder {
private Consumer<ResourceID> resourceManagerHeartbeatConsumer = ignored -> {};
private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier =
() -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
- private Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier =
+ private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier =
() -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction =
(targetDirectory, ignoredB) ->
@@ -264,7 +264,7 @@ public class TestingJobMasterGatewayBuilder {
}
public TestingJobMasterGatewayBuilder setRequestJobSupplier(
- Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier) {
+ Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier) {
this.requestJobSupplier = requestJobSupplier;
return this;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index 49d3772..d0767a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.minicluster;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -105,7 +105,7 @@ public class TestingMiniCluster extends MiniCluster {
blobServer,
heartbeatServices,
metricRegistry,
- new MemoryArchivedExecutionGraphStore(),
+ new MemoryExecutionGraphInfoStore(),
metricQueryServiceRetriever,
fatalErrorHandler));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
index ace82f9..269041a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.util.ExecutorUtils;
@@ -55,12 +54,13 @@ import static org.junit.Assert.fail;
/** Tests for the {@link DefaultExecutionGraphCache}. */
public class DefaultExecutionGraphCacheTest extends TestLogger {
- private static ArchivedExecutionGraph expectedExecutionGraph;
+ private static ExecutionGraphInfo expectedExecutionGraphInfo;
private static final JobID expectedJobId = new JobID();
@BeforeClass
public static void setup() {
- expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build();
+ expectedExecutionGraphInfo =
+ new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
}
/** Tests that we can cache AccessExecutionGraphs over multiple accesses. */
@@ -71,19 +71,20 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
final CountingRestfulGateway restfulGateway =
createCountingRestfulGateway(
- expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+ expectedJobId,
+ CompletableFuture.completedFuture(expectedExecutionGraphInfo));
try (ExecutionGraphCache executionGraphCache =
new DefaultExecutionGraphCache(timeout, timeToLive)) {
- CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
- accessExecutionGraphFuture =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ executionGraphInfoFuture =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
}
@@ -98,23 +99,23 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
final CountingRestfulGateway restfulGateway =
createCountingRestfulGateway(
expectedJobId,
- CompletableFuture.completedFuture(expectedExecutionGraph),
- CompletableFuture.completedFuture(expectedExecutionGraph));
+ CompletableFuture.completedFuture(expectedExecutionGraphInfo),
+ CompletableFuture.completedFuture(expectedExecutionGraphInfo));
try (ExecutionGraphCache executionGraphCache =
new DefaultExecutionGraphCache(timeout, timeToLive)) {
- CompletableFuture<AccessExecutionGraph> executionGraphFuture =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- assertEquals(expectedExecutionGraph, executionGraphFuture.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
// sleep for the TTL
Thread.sleep(timeToLive.toMilliseconds() * 5L);
- CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture2 =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture2.get());
assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2));
}
@@ -135,25 +136,26 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
expectedJobId,
FutureUtils.completedExceptionally(
new FlinkJobNotFoundException(expectedJobId)),
- CompletableFuture.completedFuture(expectedExecutionGraph));
+ CompletableFuture.completedFuture(expectedExecutionGraphInfo));
try (ExecutionGraphCache executionGraphCache =
new DefaultExecutionGraphCache(timeout, timeToLive)) {
- CompletableFuture<AccessExecutionGraph> executionGraphFuture =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
try {
executionGraphFuture.get();
fail("The execution graph future should have been completed exceptionally.");
} catch (ExecutionException ee) {
+ ee.printStackTrace();
assertTrue(ee.getCause() instanceof FlinkException);
}
- CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraphFuture2 =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraphFuture2.get());
}
}
@@ -166,21 +168,21 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.milliseconds(1L);
final JobID expectedJobId2 = new JobID();
- final ArchivedExecutionGraph expectedExecutionGraph2 =
- new ArchivedExecutionGraphBuilder().build();
+ final ExecutionGraphInfo expectedExecutionGraphInfo2 =
+ new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
final AtomicInteger requestJobCalls = new AtomicInteger(0);
final TestingRestfulGateway restfulGateway =
new TestingRestfulGateway.Builder()
- .setRequestJobFunction(
+ .setRequestExecutionGraphInfoFunction(
jobId -> {
requestJobCalls.incrementAndGet();
if (jobId.equals(expectedJobId)) {
return CompletableFuture.completedFuture(
- expectedExecutionGraph);
+ expectedExecutionGraphInfo);
} else if (jobId.equals(expectedJobId2)) {
return CompletableFuture.completedFuture(
- expectedExecutionGraph2);
+ expectedExecutionGraphInfo2);
} else {
throw new AssertionError("Invalid job id received.");
}
@@ -190,15 +192,15 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
try (ExecutionGraphCache executionGraphCache =
new DefaultExecutionGraphCache(timeout, timeToLive)) {
- CompletableFuture<AccessExecutionGraph> executionGraph1Future =
- executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraph1Future =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
- CompletableFuture<AccessExecutionGraph> executionGraph2Future =
- executionGraphCache.getExecutionGraph(expectedJobId2, restfulGateway);
+ CompletableFuture<ExecutionGraphInfo> executionGraph2Future =
+ executionGraphCache.getExecutionGraphInfo(expectedJobId2, restfulGateway);
- assertEquals(expectedExecutionGraph, executionGraph1Future.get());
+ assertEquals(expectedExecutionGraphInfo, executionGraph1Future.get());
- assertEquals(expectedExecutionGraph2, executionGraph2Future.get());
+ assertEquals(expectedExecutionGraphInfo2, executionGraph2Future.get());
assertThat(requestJobCalls.get(), Matchers.equalTo(2));
@@ -218,11 +220,12 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
final CountingRestfulGateway restfulGateway =
createCountingRestfulGateway(
- expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+ expectedJobId,
+ CompletableFuture.completedFuture(expectedExecutionGraphInfo));
final int numConcurrentAccesses = 10;
- final ArrayList<CompletableFuture<AccessExecutionGraph>> executionGraphFutures =
+ final ArrayList<CompletableFuture<ExecutionGraphInfo>> executionGraphFutures =
new ArrayList<>(numConcurrentAccesses);
final ExecutorService executor =
@@ -231,10 +234,10 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
try (ExecutionGraphCache executionGraphCache =
new DefaultExecutionGraphCache(timeout, timeToLive)) {
for (int i = 0; i < numConcurrentAccesses; i++) {
- CompletableFuture<AccessExecutionGraph> executionGraphFuture =
+ CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
CompletableFuture.supplyAsync(
() ->
- executionGraphCache.getExecutionGraph(
+ executionGraphCache.getExecutionGraphInfo(
expectedJobId, restfulGateway),
executor)
.thenCompose(Function.identity());
@@ -242,13 +245,13 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
executionGraphFutures.add(executionGraphFuture);
}
- final CompletableFuture<Collection<AccessExecutionGraph>> allExecutionGraphFutures =
+ final CompletableFuture<Collection<ExecutionGraphInfo>> allExecutionGraphFutures =
FutureUtils.combineAll(executionGraphFutures);
- Collection<AccessExecutionGraph> allExecutionGraphs = allExecutionGraphFutures.get();
+ Collection<ExecutionGraphInfo> allExecutionGraphs = allExecutionGraphFutures.get();
- for (AccessExecutionGraph executionGraph : allExecutionGraphs) {
- assertEquals(expectedExecutionGraph, executionGraph);
+ for (ExecutionGraphInfo executionGraph : allExecutionGraphs) {
+ assertEquals(expectedExecutionGraphInfo, executionGraph);
}
assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
@@ -258,8 +261,8 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
}
private CountingRestfulGateway createCountingRestfulGateway(
- JobID jobId, CompletableFuture<ArchivedExecutionGraph>... accessExecutionGraphs) {
- final ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue =
+ JobID jobId, CompletableFuture<ExecutionGraphInfo>... accessExecutionGraphs) {
+ final ConcurrentLinkedQueue<CompletableFuture<ExecutionGraphInfo>> queue =
new ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
return new CountingRestfulGateway(jobId, ignored -> queue.poll());
}
@@ -276,16 +279,17 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
private CountingRestfulGateway(
JobID expectedJobId,
- Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+ Function<JobID, CompletableFuture<ExecutionGraphInfo>> requestJobFunction) {
this.expectedJobId = Preconditions.checkNotNull(expectedJobId);
- this.requestJobFunction = Preconditions.checkNotNull(requestJobFunction);
+ this.requestExecutionGraphInfoFunction = Preconditions.checkNotNull(requestJobFunction);
}
@Override
- public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
+ public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+ JobID jobId, Time timeout) {
assertThat(jobId, Matchers.equalTo(expectedJobId));
numRequestJobCalls.incrementAndGet();
- return super.requestJob(jobId, timeout);
+ return super.requestExecutionGraphInfo(jobId, timeout);
}
public int getNumRequestJobCalls() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
index 0985a1c..12565f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.rest.util;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import java.util.concurrent.CompletableFuture;
@@ -36,7 +36,7 @@ public enum NoOpExecutionGraphCache implements ExecutionGraphCache {
}
@Override
- public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+ public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
JobID jobId, RestfulGateway restfulGateway) {
return FutureUtils.completedExceptionally(
new UnsupportedOperationException(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 737c69b..1e33ae0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -260,7 +260,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex archivedExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
@@ -294,7 +298,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
@@ -325,6 +333,7 @@ public class DefaultSchedulerTest extends TestLogger {
Throwable failureCause =
scheduler
.requestJob()
+ .getArchivedExecutionGraph()
.getFailureInfo()
.getException()
.deserializeError(DefaultSchedulerTest.class.getClassLoader());
@@ -388,7 +397,11 @@ public class DefaultSchedulerTest extends TestLogger {
actionsToTriggerTaskFailure.accept(vid11);
final Iterator<ArchivedExecutionVertex> vertexIterator =
- scheduler.requestJob().getAllExecutionVertices().iterator();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
final ArchivedExecutionVertex ev11 = vertexIterator.next();
final ArchivedExecutionVertex ev12 = vertexIterator.next();
final ArchivedExecutionVertex ev21 = vertexIterator.next();
@@ -424,7 +437,12 @@ public class DefaultSchedulerTest extends TestLogger {
testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
final ArchivedExecutionVertex sourceExecutionVertex =
- scheduler.requestJob().getAllExecutionVertices().iterator().next();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator()
+ .next();
final ExecutionAttemptID attemptId =
sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -437,7 +455,9 @@ public class DefaultSchedulerTest extends TestLogger {
assertThat(
testExecutionVertexOperations.getDeployedVertices(),
containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId));
- assertThat(scheduler.requestJob().getState(), is(equalTo(JobStatus.RUNNING)));
+ assertThat(
+ scheduler.requestJob().getArchivedExecutionGraph().getState(),
+ is(equalTo(JobStatus.RUNNING)));
}
@Test
@@ -478,7 +498,11 @@ public class DefaultSchedulerTest extends TestLogger {
schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -532,7 +556,11 @@ public class DefaultSchedulerTest extends TestLogger {
scheduler.handleGlobalFailure(new Exception("forced failure"));
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -554,7 +582,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final Iterator<ArchivedExecutionVertex> vertexIterator =
- scheduler.requestJob().getAllExecutionVertices().iterator();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
final ArchivedExecutionVertex v1 = vertexIterator.next();
final ArchivedExecutionVertex v2 = vertexIterator.next();
@@ -588,7 +620,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -616,7 +652,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -652,7 +692,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
@@ -743,7 +787,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final Iterator<ArchivedExecutionVertex> vertexIterator =
- scheduler.requestJob().getAllExecutionVertices().iterator();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
final ExecutionAttemptID attemptId1 =
vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionAttemptID attemptId2 =
@@ -781,7 +829,11 @@ public class DefaultSchedulerTest extends TestLogger {
final SchedulingTopology topology = scheduler.getSchedulingTopology();
final Iterator<ArchivedExecutionVertex> vertexIterator =
- scheduler.requestJob().getAllExecutionVertices().iterator();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
final ExecutionAttemptID attemptId1 =
vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionAttemptID attemptId2 =
@@ -818,7 +870,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
@@ -830,7 +886,8 @@ public class DefaultSchedulerTest extends TestLogger {
ExecutionState.FAILED,
new RuntimeException(exceptionMessage)));
- final ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
+ final ErrorInfo failureInfo =
+ scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
assertThat(failureInfo, is(notNullValue()));
assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage));
}
@@ -850,7 +907,11 @@ public class DefaultSchedulerTest extends TestLogger {
scheduler.startScheduling();
Iterator<ArchivedExecutionVertex> vertexIterator =
- scheduler.requestJob().getAllExecutionVertices().iterator();
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
ArchivedExecutionVertex v1 = vertexIterator.next();
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2));
@@ -863,7 +924,12 @@ public class DefaultSchedulerTest extends TestLogger {
ExecutionState.FAILED,
new RuntimeException(exceptionMessage)));
- vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+ vertexIterator =
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .iterator();
v1 = vertexIterator.next();
ArchivedExecutionVertex v2 = vertexIterator.next();
assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
@@ -877,7 +943,11 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ExecutionAttemptID attemptId =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices())
.getCurrentExecutionAttempt()
.getAttemptId();
@@ -917,7 +987,11 @@ public class DefaultSchedulerTest extends TestLogger {
// initiate restartable failure
final ExecutionAttemptID restartableAttemptId =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices())
.getCurrentExecutionAttempt()
.getAttemptId();
final RuntimeException restartableException = new RuntimeException("restartable exception");
@@ -930,7 +1004,11 @@ public class DefaultSchedulerTest extends TestLogger {
testRestartBackoffTimeStrategy.setCanRestart(false);
final ExecutionAttemptID failingAttemptId =
- Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices())
.getCurrentExecutionAttempt()
.getAttemptId();
final RuntimeException failingException = new RuntimeException("failing exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 9ff306d..ad3e98f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -190,7 +190,12 @@ public class SchedulerTestingUtils {
public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(
DefaultScheduler scheduler) {
return StreamSupport.stream(
- scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .spliterator(),
+ false)
.map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
.collect(Collectors.toList());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 71a990f..b64af0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -120,7 +119,7 @@ public class TestingSchedulerNG implements SchedulerNG {
}
@Override
- public ArchivedExecutionGraph requestJob() {
+ public ExecutionGraphInfo requestJob() {
failOperation();
return null;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index efe8bfc..eccf446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.TriFunction;
@@ -86,6 +87,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
String hostname,
Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+ Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ requestExecutionGraphInfoFunction,
Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -115,6 +118,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
hostname,
cancelJobFunction,
requestJobFunction,
+ requestExecutionGraphInfoFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,
@@ -228,6 +232,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
hostname,
cancelJobFunction,
requestJobFunction,
+ requestExecutionGraphInfoFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
index bd3e06a..69c6d30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
@@ -31,7 +31,7 @@ import java.util.function.IntSupplier;
public class TestingExecutionGraphCache implements ExecutionGraphCache {
private final IntSupplier sizeSupplier;
- private final BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+ private final BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
getExecutionGraphFunction;
private final Runnable cleanupRunnable;
@@ -40,7 +40,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
private TestingExecutionGraphCache(
IntSupplier sizeSupplier,
- BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+ BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
getExecutionGraphFunction,
Runnable cleanupRunnable,
Runnable closeRunnable) {
@@ -56,7 +56,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
}
@Override
- public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+ public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
JobID jobId, RestfulGateway restfulGateway) {
return getExecutionGraphFunction.apply(jobId, restfulGateway);
}
@@ -79,7 +79,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
public static final class Builder {
private IntSupplier sizeSupplier = () -> 0;
- private BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+ private BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
getExecutionGraphFunction =
(ignoredA, ignoredB) ->
FutureUtils.completedExceptionally(
@@ -95,7 +95,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
}
public Builder setGetExecutionGraphFunction(
- BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+ BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
getExecutionGraphFunction) {
this.getExecutionGraphFunction = getExecutionGraphFunction;
return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 3d10329..f36af49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.TriFunction;
@@ -54,6 +55,10 @@ public class TestingRestfulGateway implements RestfulGateway {
DEFAULT_REQUEST_JOB_FUNCTION =
jobId ->
FutureUtils.completedExceptionally(new UnsupportedOperationException());
+ static final Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ DEFAULT_REQUEST_EXECUTION_GRAPH_INFO =
+ jobId ->
+ FutureUtils.completedExceptionally(new UnsupportedOperationException());
static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION =
jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
static final Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -106,6 +111,9 @@ public class TestingRestfulGateway implements RestfulGateway {
protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
+ protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ requestExecutionGraphInfoFunction;
+
protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
@@ -137,6 +145,7 @@ public class TestingRestfulGateway implements RestfulGateway {
LOCALHOST,
DEFAULT_CANCEL_JOB_FUNCTION,
DEFAULT_REQUEST_JOB_FUNCTION,
+ DEFAULT_REQUEST_EXECUTION_GRAPH_INFO,
DEFAULT_REQUEST_JOB_RESULT_FUNCTION,
DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
@@ -154,6 +163,8 @@ public class TestingRestfulGateway implements RestfulGateway {
String hostname,
Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+ Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ requestExecutionGraphInfoFunction,
Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -175,6 +186,7 @@ public class TestingRestfulGateway implements RestfulGateway {
this.hostname = hostname;
this.cancelJobFunction = cancelJobFunction;
this.requestJobFunction = requestJobFunction;
+ this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
this.requestJobResultFunction = requestJobResultFunction;
this.requestJobStatusFunction = requestJobStatusFunction;
this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier;
@@ -206,6 +218,12 @@ public class TestingRestfulGateway implements RestfulGateway {
}
@Override
+ public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+ JobID jobId, Time timeout) {
+ return requestExecutionGraphInfoFunction.apply(jobId);
+ }
+
+ @Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
return requestJobResultFunction.apply(jobId);
}
@@ -278,6 +296,8 @@ public class TestingRestfulGateway implements RestfulGateway {
protected String hostname = LOCALHOST;
protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
+ protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ requestExecutionGraphInfoFunction;
protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
protected Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -301,6 +321,7 @@ public class TestingRestfulGateway implements RestfulGateway {
protected AbstractBuilder() {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
+ requestExecutionGraphInfoFunction = DEFAULT_REQUEST_EXECUTION_GRAPH_INFO;
requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
requestMultipleJobDetailsSupplier = DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
@@ -338,6 +359,13 @@ public class TestingRestfulGateway implements RestfulGateway {
return self();
}
+ public T setRequestExecutionGraphInfoFunction(
+ Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+ requestExecutionGraphInfoFunction) {
+ this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
+ return self();
+ }
+
public T setRequestJobResultFunction(
Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction) {
this.requestJobResultFunction = requestJobResultFunction;
@@ -429,6 +457,7 @@ public class TestingRestfulGateway implements RestfulGateway {
hostname,
cancelJobFunction,
requestJobFunction,
+ requestExecutionGraphInfoFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 942987e..6a8487e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -159,7 +159,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
blobServerResource.getBlobServer(),
new HeartbeatServices(100L, 1000L),
NoOpMetricRegistry.INSTANCE,
- new MemoryArchivedExecutionGraphStore(),
+ new MemoryExecutionGraphInfoStore(),
VoidMetricQueryServiceRetriever.INSTANCE,
fatalErrorHandler);