You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/23 16:29:21 UTC

[flink] 03/03: [FLINK-21188][runtime] Introduces ExecutionGraphInfo

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7e93fd671a0dc0e7ba919a447ca9e51bb451fdd
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Jan 27 15:47:33 2021 +0100

    [FLINK-21188][runtime] Introduces ExecutionGraphInfo
    
    This closes #14804.
---
 .../application/ApplicationClusterEntryPoint.java  |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java       |  58 ++---
 .../flink/runtime/dispatcher/DispatcherJob.java    |  36 +--
 .../runtime/dispatcher/DispatcherJobResult.java    |  25 ++-
 .../runtime/dispatcher/DispatcherServices.java     |  10 +-
 ...raphStore.java => ExecutionGraphInfoStore.java} |  20 +-
 ...Store.java => FileExecutionGraphInfoStore.java} |  77 +++----
 .../runtime/dispatcher/HistoryServerArchivist.java |  10 +-
 .../JsonResponseHistoryServerArchivist.java        |   8 +-
 ...ore.java => MemoryExecutionGraphInfoStore.java} |  38 ++--
 .../flink/runtime/dispatcher/MiniDispatcher.java   |   7 +-
 .../dispatcher/PartialDispatcherServices.java      |  10 +-
 ...PartialDispatcherServicesWithJobGraphStore.java |   4 +-
 .../dispatcher/VoidHistoryServerArchivist.java     |   5 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  14 +-
 .../runtime/entrypoint/JobClusterEntrypoint.java   |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java       |   8 +-
 ...tDispatcherResourceManagerComponentFactory.java |   6 +-
 .../DispatcherResourceManagerComponentFactory.java |   4 +-
 .../runtime/jobmanager/OnCompletionActions.java    |   6 +-
 .../runtime/jobmaster/JobManagerRunnerImpl.java    |   8 +-
 .../runtime/jobmaster/JobManagerRunnerResult.java  |  28 +--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  10 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   8 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   4 +-
 ...va => AbstractAccessExecutionGraphHandler.java} |  63 +++---
 .../handler/job/AbstractExecutionGraphHandler.java |  17 +-
 .../rest/handler/job/AbstractJobVertexHandler.java |   2 +-
 .../rest/handler/job/JobAccumulatorsHandler.java   |   3 +-
 .../runtime/rest/handler/job/JobConfigHandler.java |   2 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +-
 .../rest/handler/job/JobExceptionsHandler.java     |   3 +-
 .../runtime/rest/handler/job/JobPlanHandler.java   |   3 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   3 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   3 +-
 .../job/checkpoints/AbstractCheckpointHandler.java |   4 +-
 .../job/checkpoints/CheckpointConfigHandler.java   |   4 +-
 .../CheckpointingStatisticsHandler.java            |   4 +-
 .../handler/legacy/DefaultExecutionGraphCache.java |  40 ++--
 .../rest/handler/legacy/ExecutionGraphCache.java   |  21 +-
 .../runtime/scheduler/ExecutionGraphInfo.java      |  62 ++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |   5 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |   3 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   6 +-
 .../flink/runtime/webmonitor/RestfulGateway.java   |  20 +-
 .../runtime/webmonitor/history/JsonArchivist.java  |  20 ++
 .../runtime/dispatcher/DispatcherJobTest.java      |  67 ++++--
 .../dispatcher/DispatcherResourceCleanupTest.java  |  36 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |  42 ++--
 ...t.java => FileExecutionGraphInfoStoreTest.java} | 244 ++++++++++++---------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  34 +--
 .../runtime/dispatcher/TestingDispatcher.java      |   6 +-
 .../runner/DefaultDispatcherRunnerITCase.java      |   4 +-
 .../ZooKeeperDefaultDispatcherRunnerTest.java      |   4 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  |   4 +-
 .../jobmaster/JobManagerRunnerImplTest.java        |  19 +-
 .../jobmaster/JobManagerRunnerResultTest.java      |  18 +-
 ...asterExecutionDeploymentReconciliationTest.java |   6 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  12 +-
 .../runtime/jobmaster/TestingJobManagerRunner.java |   7 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |  15 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   8 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |   6 +-
 .../runtime/minicluster/TestingMiniCluster.java    |   4 +-
 .../legacy/DefaultExecutionGraphCacheTest.java     | 104 ++++-----
 .../runtime/rest/util/NoOpExecutionGraphCache.java |   4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 118 ++++++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |   7 +-
 .../runtime/scheduler/TestingSchedulerNG.java      |   3 +-
 .../webmonitor/TestingDispatcherGateway.java       |   5 +
 .../webmonitor/TestingExecutionGraphCache.java     |  12 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 73 files changed, 921 insertions(+), 611 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
index 30e872d..e0b2128 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -78,9 +78,9 @@ public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             final Configuration configuration, final ScheduledExecutor scheduledExecutor) {
-        return new MemoryArchivedExecutionGraphStore();
+        return new MemoryExecutionGraphInfoStore();
     }
 
     protected static void configureExecution(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1a4befe..63ea5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -121,7 +122,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
 
-    private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     private final JobManagerRunnerFactory jobManagerRunnerFactory;
 
@@ -177,7 +178,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
 
-        this.archivedExecutionGraphStore = dispatcherServices.getArchivedExecutionGraphStore();
+        this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
 
         this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
 
@@ -441,7 +442,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 && executionType == ExecutionType.RECOVERY) {
             return dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
         } else {
-            return jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
+            return jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo());
         }
     }
 
@@ -557,8 +558,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
         CompletableFuture<Collection<JobStatus>> allJobsFuture =
                 allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
 
-        final JobsOverview completedJobsOverview =
-                archivedExecutionGraphStore.getStoredJobsOverview();
+        final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview();
 
         return allJobsFuture.thenCombine(
                 taskManagerOverviewFuture,
@@ -581,7 +581,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
 
         final Collection<JobDetails> completedJobDetails =
-                archivedExecutionGraphStore.getAvailableJobDetails();
+                executionGraphInfoStore.getAvailableJobDetails();
 
         return combinedJobDetails.thenApply(
                 (Collection<JobDetails> runningJobDetails) -> {
@@ -603,7 +603,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                         () -> {
                             // is it a completed job?
                             final JobDetails jobDetails =
-                                    archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+                                    executionGraphInfoStore.getAvailableJobDetails(jobId);
                             if (jobDetails == null) {
                                 return FutureUtils.completedExceptionally(
                                         new FlinkJobNotFoundException(jobId));
@@ -614,17 +614,18 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
-        Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException =
+    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, Time timeout) {
+        Function<Throwable, ExecutionGraphInfo> checkExecutionGraphStoreOnException =
                 throwable -> {
                     // check whether it is a completed job
-                    final ArchivedExecutionGraph archivedExecutionGraph =
-                            archivedExecutionGraphStore.get(jobId);
-                    if (archivedExecutionGraph == null) {
+                    final ExecutionGraphInfo executionGraphInfo =
+                            executionGraphInfoStore.get(jobId);
+                    if (executionGraphInfo == null) {
                         throw new CompletionException(
                                 ExceptionUtils.stripCompletionException(throwable));
                     } else {
-                        return archivedExecutionGraph;
+                        return executionGraphInfo;
                     }
                 };
         Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
@@ -638,21 +639,22 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
         DispatcherJob job = runningJobs.get(jobId);
 
         if (job == null) {
-            final ArchivedExecutionGraph archivedExecutionGraph =
-                    archivedExecutionGraphStore.get(jobId);
+            final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
 
-            if (archivedExecutionGraph == null) {
+            if (executionGraphInfo == null) {
                 return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
             } else {
                 return CompletableFuture.completedFuture(
-                        JobResult.createFrom(archivedExecutionGraph));
+                        JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
             }
         } else {
             return job.getResultFuture()
                     .thenApply(
                             dispatcherJobResult ->
                                     JobResult.createFrom(
-                                            dispatcherJobResult.getArchivedExecutionGraph()));
+                                            dispatcherJobResult
+                                                    .getExecutionGraphInfo()
+                                                    .getArchivedExecutionGraph()));
         }
     }
 
@@ -827,7 +829,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     protected CleanupJobState jobReachedGloballyTerminalState(
-            ArchivedExecutionGraph archivedExecutionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
+        ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         Preconditions.checkArgument(
                 archivedExecutionGraph.getState().isGloballyTerminalState(),
                 "Job %s is in state %s which is not globally terminal.",
@@ -839,32 +843,32 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 archivedExecutionGraph.getJobID(),
                 archivedExecutionGraph.getState());
 
-        archiveExecutionGraph(archivedExecutionGraph);
+        archiveExecutionGraph(executionGraphInfo);
 
         return CleanupJobState.GLOBAL;
     }
 
-    private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
+    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
         try {
-            archivedExecutionGraphStore.put(archivedExecutionGraph);
+            executionGraphInfoStore.put(executionGraphInfo);
         } catch (IOException e) {
             log.info(
                     "Could not store completed job {}({}).",
-                    archivedExecutionGraph.getJobName(),
-                    archivedExecutionGraph.getJobID(),
+                    executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                    executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
 
         final CompletableFuture<Acknowledge> executionGraphFuture =
-                historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
+                historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
 
         executionGraphFuture.whenComplete(
                 (Acknowledge ignored, Throwable throwable) -> {
                     if (throwable != null) {
                         log.info(
                                 "Could not archive completed job {}({}) to the history server.",
-                                archivedExecutionGraph.getJobName(),
-                                archivedExecutionGraph.getJobID(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                                 throwable);
                     }
                 });
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
index 9e253f2..7243303 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -137,8 +138,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
     private void handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult) {
         if (jobManagerRunnerResult.isSuccess()) {
             jobResultFuture.complete(
-                    DispatcherJobResult.forSuccess(
-                            jobManagerRunnerResult.getArchivedExecutionGraph()));
+                    DispatcherJobResult.forSuccess(jobManagerRunnerResult.getExecutionGraphInfo()));
         } else if (jobManagerRunnerResult.isJobNotFinished()) {
             jobResultFuture.completeExceptionally(new JobNotFinishedException(jobId));
         } else if (jobManagerRunnerResult.isInitializationFailure()) {
@@ -156,7 +156,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
                         initializationTimestamp);
         jobResultFuture.complete(
                 DispatcherJobResult.forInitializationFailure(
-                        archivedExecutionGraph, initializationFailure));
+                        new ExecutionGraphInfo(archivedExecutionGraph), initializationFailure));
     }
 
     public CompletableFuture<DispatcherJobResult> getResultFuture() {
@@ -166,9 +166,10 @@ public final class DispatcherJob implements AutoCloseableAsync {
     public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
         return requestJob(timeout)
                 .thenApply(
-                        executionGraph -> {
+                        executionGraphInfo -> {
                             synchronized (lock) {
-                                return JobDetails.createDetailsForJob(executionGraph);
+                                return JobDetails.createDetailsForJob(
+                                        executionGraphInfo.getArchivedExecutionGraph());
                             }
                         });
     }
@@ -205,16 +206,18 @@ public final class DispatcherJob implements AutoCloseableAsync {
     }
 
     public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
-        return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+        return requestJob(timeout)
+                .thenApply(
+                        executionGraphInfo ->
+                                executionGraphInfo.getArchivedExecutionGraph().getState());
     }
 
-    /** Returns a future completing to the ArchivedExecutionGraph of the job. */
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    /** Returns a future completing to the ExecutionGraphInfo of the job. */
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         synchronized (lock) {
             if (isInitialized()) {
                 if (jobResultFuture.isDone()) { // job is not running anymore
-                    return jobResultFuture.thenApply(
-                            DispatcherJobResult::getArchivedExecutionGraph);
+                    return jobResultFuture.thenApply(DispatcherJobResult::getExecutionGraphInfo);
                 }
                 // job is still running
                 return getJobMasterGateway()
@@ -224,12 +227,13 @@ public final class DispatcherJob implements AutoCloseableAsync {
                         this.jobStatus == DispatcherJobStatus.INITIALIZING
                                 || jobStatus == DispatcherJobStatus.CANCELLING);
                 return CompletableFuture.completedFuture(
-                        ArchivedExecutionGraph.createFromInitializingJob(
-                                jobId,
-                                jobName,
-                                jobStatus.asJobStatus(),
-                                null,
-                                initializationTimestamp));
+                        new ExecutionGraphInfo(
+                                ArchivedExecutionGraph.createFromInitializingJob(
+                                        jobId,
+                                        jobName,
+                                        jobStatus.asJobStatus(),
+                                        null,
+                                        initializationTimestamp)));
             }
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
index ad4f5ed..a5d93da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
@@ -19,25 +19,26 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 /**
- * Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization
- * has failed. For initialization failures, the throwable is also attached, to avoid deserializing
- * it from the ArchivedExecutionGraph.
+ * Container for returning the {@link ExecutionGraphInfo} and a flag whether the initialization has
+ * failed. For initialization failures, the throwable is also attached, to avoid deserializing it
+ * from the {@link ArchivedExecutionGraph}.
  */
 final class DispatcherJobResult {
 
-    private final ArchivedExecutionGraph archivedExecutionGraph;
+    private final ExecutionGraphInfo executionGraphInfo;
 
     // if the throwable field is set, the job failed during initialization.
     @Nullable private final Throwable initializationFailure;
 
     private DispatcherJobResult(
-            ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
-        this.archivedExecutionGraph = archivedExecutionGraph;
+            ExecutionGraphInfo executionGraphInfo, @Nullable Throwable throwable) {
+        this.executionGraphInfo = executionGraphInfo;
         this.initializationFailure = throwable;
     }
 
@@ -45,8 +46,8 @@ final class DispatcherJobResult {
         return initializationFailure != null;
     }
 
-    public ArchivedExecutionGraph getArchivedExecutionGraph() {
-        return archivedExecutionGraph;
+    public ExecutionGraphInfo getExecutionGraphInfo() {
+        return executionGraphInfo;
     }
 
     /** @throws IllegalStateException if this DispatcherJobResult is a successful initialization. */
@@ -58,11 +59,11 @@ final class DispatcherJobResult {
     }
 
     public static DispatcherJobResult forInitializationFailure(
-            ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
-        return new DispatcherJobResult(archivedExecutionGraph, throwable);
+            ExecutionGraphInfo executionGraphInfo, Throwable throwable) {
+        return new DispatcherJobResult(executionGraphInfo, throwable);
     }
 
-    public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
-        return new DispatcherJobResult(archivedExecutionGraph, null);
+    public static DispatcherJobResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+        return new DispatcherJobResult(executionGraphInfo, null);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 79a837a..d4ecaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -48,7 +48,7 @@ public class DispatcherServices {
 
     @Nonnull private final JobManagerMetricGroup jobManagerMetricGroup;
 
-    @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     @Nonnull private final FatalErrorHandler fatalErrorHandler;
 
@@ -68,7 +68,7 @@ public class DispatcherServices {
             @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -81,7 +81,7 @@ public class DispatcherServices {
         this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
         this.blobServer = blobServer;
         this.heartbeatServices = heartbeatServices;
-        this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+        this.executionGraphInfoStore = executionGraphInfoStore;
         this.fatalErrorHandler = fatalErrorHandler;
         this.historyServerArchivist = historyServerArchivist;
         this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -122,8 +122,8 @@ public class DispatcherServices {
     }
 
     @Nonnull
-    public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
-        return archivedExecutionGraphStore;
+    public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+        return executionGraphInfoStore;
     }
 
     @Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
similarity index 76%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
index d38d804..1d63567 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nullable;
 
@@ -29,32 +29,32 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 
-/** Interface for a {@link ArchivedExecutionGraph} store. */
-public interface ArchivedExecutionGraphStore extends Closeable {
+/** Interface for a {@link ExecutionGraphInfo} store. */
+public interface ExecutionGraphInfoStore extends Closeable {
 
     /**
-     * Returns the current number of stored {@link ArchivedExecutionGraph}.
+     * Returns the current number of stored {@link ExecutionGraphInfo} instances.
      *
-     * @return Current number of stored {@link ArchivedExecutionGraph}
+     * @return Current number of stored {@link ExecutionGraphInfo} instances
      */
     int size();
 
     /**
-     * Get the {@link ArchivedExecutionGraph} for the given job id. Null if it isn't stored.
+     * Get the {@link ExecutionGraphInfo} for the given job id. Null if it isn't stored.
      *
      * @param jobId identifying the serializable execution graph to retrieve
      * @return The stored serializable execution graph or null
      */
     @Nullable
-    ArchivedExecutionGraph get(JobID jobId);
+    ExecutionGraphInfo get(JobID jobId);
 
     /**
-     * Store the given {@link ArchivedExecutionGraph} in the store.
+     * Store the given {@link ExecutionGraphInfo} in the store.
      *
-     * @param archivedExecutionGraph to store
+     * @param executionGraphInfo to store
      * @throws IOException if the serializable execution graph could not be stored in the store
      */
-    void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException;
+    void put(ExecutionGraphInfo executionGraphInfo) throws IOException;
 
     /**
      * Return the {@link JobsOverview} for all stored/past jobs.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
similarity index 78%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
index f08c996..b08d426 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -55,20 +56,19 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
- * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
- * the stored execution graphs are periodically cleaned up.
+ * Store for {@link ExecutionGraphInfo} instances. The store writes the archived execution graph
+ * information to disk and keeps the most recently used execution graphs in a memory cache for
+ * faster serving. Moreover, the stored execution graphs are periodically cleaned up.
  */
-public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
 
     private final File storageDir;
 
     private final Cache<JobID, JobDetails> jobDetailsCache;
 
-    private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
+    private final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache;
 
     private final ScheduledFuture<?> cleanupFuture;
 
@@ -80,7 +80,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
     private int numCanceledJobs;
 
-    public FileArchivedExecutionGraphStore(
+    public FileExecutionGraphInfoStore(
             File rootDir,
             Time expirationTime,
             int maximumCapacity,
@@ -93,7 +93,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
         LOG.info(
                 "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
-                FileArchivedExecutionGraphStore.class.getSimpleName(),
+                FileExecutionGraphInfoStore.class.getSimpleName(),
                 storageDirectory,
                 expirationTime.toMilliseconds(),
                 maximumCacheSizeBytes);
@@ -113,15 +113,14 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
                         .ticker(ticker)
                         .build();
 
-        this.archivedExecutionGraphCache =
+        this.executionGraphInfoCache =
                 CacheBuilder.newBuilder()
                         .maximumWeight(maximumCacheSizeBytes)
                         .weigher(this::calculateSize)
                         .build(
-                                new CacheLoader<JobID, ArchivedExecutionGraph>() {
+                                new CacheLoader<JobID, ExecutionGraphInfo>() {
                                     @Override
-                                    public ArchivedExecutionGraph load(JobID jobId)
-                                            throws Exception {
+                                    public ExecutionGraphInfo load(JobID jobId) throws Exception {
                                         return loadExecutionGraph(jobId);
                                     }
                                 });
@@ -147,19 +146,23 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
     @Override
     @Nullable
-    public ArchivedExecutionGraph get(JobID jobId) {
+    public ExecutionGraphInfo get(JobID jobId) {
         try {
-            return archivedExecutionGraphCache.get(jobId);
+            return executionGraphInfoCache.get(jobId);
         } catch (ExecutionException e) {
-            LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
+            LOG.debug(
+                    "Could not load archived execution graph information for job id {}.", jobId, e);
             return null;
         }
     }
 
     @Override
-    public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
+    public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
+        final JobID jobId = executionGraphInfo.getJobId();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         final JobStatus jobStatus = archivedExecutionGraph.getState();
-        final JobID jobId = archivedExecutionGraph.getJobID();
         final String jobName = archivedExecutionGraph.getJobName();
 
         Preconditions.checkArgument(
@@ -195,12 +198,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
         }
 
         // write the ArchivedExecutionGraph to disk
-        storeArchivedExecutionGraph(archivedExecutionGraph);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);
 
         jobDetailsCache.put(jobId, detailsForJob);
-        archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
+        executionGraphInfoCache.put(jobId, executionGraphInfo);
     }
 
     @Override
@@ -236,27 +239,28 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
     // Internal methods
     // --------------------------------------------------------------
 
-    private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
-        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+    private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) {
+        final File executionGraphInfoFile = getExecutionGraphFile(jobId);
 
-        if (archivedExecutionGraphFile.exists()) {
-            return Math.toIntExact(archivedExecutionGraphFile.length());
+        if (executionGraphInfoFile.exists()) {
+            return Math.toIntExact(executionGraphInfoFile.length());
         } else {
             LOG.debug(
-                    "Could not find archived execution graph file for {}. Estimating the size instead.",
+                    "Could not find execution graph information file for {}. Estimating the size instead.",
                     jobId);
+            final ArchivedExecutionGraph serializableExecutionGraph =
+                    serializableExecutionGraphInfo.getArchivedExecutionGraph();
             return serializableExecutionGraph.getAllVertices().size() * 1000
                     + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
         }
     }
 
-    private ArchivedExecutionGraph loadExecutionGraph(JobID jobId)
+    private ExecutionGraphInfo loadExecutionGraph(JobID jobId)
             throws IOException, ClassNotFoundException {
-        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+        final File executionGraphInfoFile = getExecutionGraphFile(jobId);
 
-        if (archivedExecutionGraphFile.exists()) {
-            try (FileInputStream fileInputStream =
-                    new FileInputStream(archivedExecutionGraphFile)) {
+        if (executionGraphInfoFile.exists()) {
+            try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile)) {
                 return InstantiationUtil.deserializeObject(
                         fileInputStream, getClass().getClassLoader());
             }
@@ -268,13 +272,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
         }
     }
 
-    private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph)
-            throws IOException {
+    private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
         final File archivedExecutionGraphFile =
-                getExecutionGraphFile(archivedExecutionGraph.getJobID());
+                getExecutionGraphFile(executionGraphInfo.getJobId());
 
         try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
-            InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
+            InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
         }
     }
 
@@ -293,7 +296,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
             LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
         }
 
-        archivedExecutionGraphCache.invalidate(jobId);
+        executionGraphInfoCache.invalidate(jobId);
         jobDetailsCache.invalidate(jobId);
     }
 
@@ -323,7 +326,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
     }
 
     @VisibleForTesting
-    LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
-        return archivedExecutionGraphCache;
+    LoadingCache<JobID, ExecutionGraphInfo> getExecutionGraphInfoCache() {
+        return executionGraphInfoCache;
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
index 2a571d2..f40af97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -21,23 +21,23 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-/** Writer for an {@link AccessExecutionGraph}. */
+/** Writer for an {@link ExecutionGraphInfo}. */
 public interface HistoryServerArchivist {
 
     /**
-     * Archives the given {@link AccessExecutionGraph} on the history server.
+     * Archives the given {@link ExecutionGraphInfo} on the history server.
      *
-     * @param executionGraph to store on the history server
+     * @param executionGraphInfo to store on the history server
      * @return Future which is completed once the archiving has been completed.
      */
-    CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+    CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo);
 
     static HistoryServerArchivist createHistoryServerArchivist(
             Configuration configuration, JsonArchivist jsonArchivist, Executor ioExecutor) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
index 991ff67..e68f3b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -50,14 +51,15 @@ class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
 
     @Override
     public CompletableFuture<Acknowledge> archiveExecutionGraph(
-            AccessExecutionGraph executionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
         return CompletableFuture.runAsync(
                         ThrowingRunnable.unchecked(
                                 () ->
                                         FsJobArchivist.archiveJob(
                                                 archivePath,
-                                                executionGraph.getJobID(),
-                                                jsonArchivist.archiveJsonWithPath(executionGraph))),
+                                                executionGraphInfo.getJobId(),
+                                                jsonArchivist.archiveJsonWithPath(
+                                                        executionGraphInfo))),
                         ioExecutor)
                 .thenApply(ignored -> Acknowledge.get());
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
similarity index 60%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
index 53df4a7..35b57da3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nullable;
 
@@ -33,34 +34,35 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * {@link ArchivedExecutionGraphStore} implementation which stores the {@link
- * ArchivedExecutionGraph} in memory.
+ * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in
+ * memory.
  */
-public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ArchivedExecutionGraph> serializableExecutionGraphs = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = new HashMap<>(4);
 
     @Override
     public int size() {
-        return serializableExecutionGraphs.size();
+        return serializableExecutionGraphInfos.size();
     }
 
     @Nullable
     @Override
-    public ArchivedExecutionGraph get(JobID jobId) {
-        return serializableExecutionGraphs.get(jobId);
+    public ExecutionGraphInfo get(JobID jobId) {
+        return serializableExecutionGraphInfos.get(jobId);
     }
 
     @Override
-    public void put(ArchivedExecutionGraph serializableExecutionGraph) throws IOException {
-        serializableExecutionGraphs.put(
-                serializableExecutionGraph.getJobID(), serializableExecutionGraph);
+    public void put(ExecutionGraphInfo serializableExecutionGraphInfo) throws IOException {
+        serializableExecutionGraphInfos.put(
+                serializableExecutionGraphInfo.getJobId(), serializableExecutionGraphInfo);
     }
 
     @Override
     public JobsOverview getStoredJobsOverview() {
         Collection<JobStatus> allJobStatus =
-                serializableExecutionGraphs.values().stream()
+                serializableExecutionGraphInfos.values().stream()
+                        .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                         .map(ArchivedExecutionGraph::getState)
                         .collect(Collectors.toList());
 
@@ -69,7 +71,8 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
 
     @Override
     public Collection<JobDetails> getAvailableJobDetails() {
-        return serializableExecutionGraphs.values().stream()
+        return serializableExecutionGraphInfos.values().stream()
+                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
@@ -77,11 +80,12 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
     @Nullable
     @Override
     public JobDetails getAvailableJobDetails(JobID jobId) {
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                serializableExecutionGraphs.get(jobId);
+        final ExecutionGraphInfo archivedExecutionGraphInfo =
+                serializableExecutionGraphInfos.get(jobId);
 
-        if (archivedExecutionGraph != null) {
-            return JobDetails.createDetailsForJob(archivedExecutionGraph);
+        if (archivedExecutionGraphInfo != null) {
+            return JobDetails.createDetailsForJob(
+                    archivedExecutionGraphInfo.getArchivedExecutionGraph());
         } else {
             return null;
         }
@@ -89,6 +93,6 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
 
     @Override
     public void close() throws IOException {
-        serializableExecutionGraphs.clear();
+        serializableExecutionGraphInfos.clear();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 553cc76..4b9b4b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FlinkException;
 
 import java.util.Collections;
@@ -119,9 +120,11 @@ public class MiniDispatcher extends Dispatcher {
 
     @Override
     protected CleanupJobState jobReachedGloballyTerminalState(
-            ArchivedExecutionGraph archivedExecutionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         final CleanupJobState cleanupHAState =
-                super.jobReachedGloballyTerminalState(archivedExecutionGraph);
+                super.jobReachedGloballyTerminalState(executionGraphInfo);
 
         if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
             // shut down if job is cancelled or we don't have to wait for the execution result
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 81bd0d1..6fc8d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -49,7 +49,7 @@ public class PartialDispatcherServices {
 
     @Nonnull private final JobManagerMetricGroupFactory jobManagerMetricGroupFactory;
 
-    @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     @Nonnull private final FatalErrorHandler fatalErrorHandler;
 
@@ -66,7 +66,7 @@ public class PartialDispatcherServices {
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
             @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -77,7 +77,7 @@ public class PartialDispatcherServices {
         this.blobServer = blobServer;
         this.heartbeatServices = heartbeatServices;
         this.jobManagerMetricGroupFactory = jobManagerMetricGroupFactory;
-        this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+        this.executionGraphInfoStore = executionGraphInfoStore;
         this.fatalErrorHandler = fatalErrorHandler;
         this.historyServerArchivist = historyServerArchivist;
         this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -115,8 +115,8 @@ public class PartialDispatcherServices {
     }
 
     @Nonnull
-    public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
-        return archivedExecutionGraphStore;
+    public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+        return executionGraphInfoStore;
     }
 
     @Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
index ac69737..f4a4716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
@@ -44,7 +44,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
             @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -57,7 +57,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
                 blobServer,
                 heartbeatServices,
                 jobManagerMetricGroupFactory,
-                archivedExecutionGraphStore,
+                executionGraphInfoStore,
                 fatalErrorHandler,
                 historyServerArchivist,
                 metricQueryServiceAddress,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
index 0ce3ba6..28b4217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -28,8 +28,7 @@ public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
     INSTANCE;
 
     @Override
-    public CompletableFuture<Acknowledge> archiveExecutionGraph(
-            AccessExecutionGraph executionGraph) {
+    public CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraph) {
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8caa6a7..04111ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -145,7 +145,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
     @GuardedBy("lock")
     private ExecutorService ioExecutor;
 
-    private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    private ExecutionGraphInfoStore executionGraphInfoStore;
 
     private final Thread shutDownHook;
 
@@ -254,7 +254,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                             blobServer,
                             heartbeatServices,
                             metricRegistry,
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             new RpcMetricQueryServiceRetriever(
                                     metricRegistry.getMetricQueryServiceRpcService()),
                             this);
@@ -322,7 +322,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                             ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                     configuration));
 
-            archivedExecutionGraphStore =
+            executionGraphInfoStore =
                     createSerializableExecutionGraphStore(
                             configuration, commonRpcService.getScheduledExecutor());
         }
@@ -399,9 +399,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                 }
             }
 
-            if (archivedExecutionGraphStore != null) {
+            if (executionGraphInfoStore != null) {
                 try {
-                    archivedExecutionGraphStore.close();
+                    executionGraphInfoStore.close();
                 } catch (Throwable t) {
                     exception = ExceptionUtils.firstOrSuppressed(t, exception);
                 }
@@ -538,7 +538,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
             createDispatcherResourceManagerComponentFactory(Configuration configuration)
                     throws IOException;
 
-    protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
 
     protected static EntrypointClusterConfiguration parseArguments(String[] args)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2b19f6e..889ea7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 
 /** Base class for per-job cluster entry points. */
 public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
@@ -31,8 +31,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) {
-        return new MemoryArchivedExecutionGraphStore();
+        return new MemoryExecutionGraphInfoStore();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fb46589..b455f02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
@@ -39,7 +39,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException {
         final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
 
@@ -50,7 +50,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
         final long maximumCacheSizeBytes =
                 configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
 
-        return new FileArchivedExecutionGraphStore(
+        return new FileExecutionGraphInfoStore(
                 tmpDir,
                 expirationTime,
                 maximumCapacity,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index e9b7847..10963ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -106,7 +106,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
             BlobServer blobServer,
             HeartbeatServices heartbeatServices,
             MetricRegistry metricRegistry,
-            ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
             FatalErrorHandler fatalErrorHandler)
             throws Exception {
@@ -201,7 +201,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
                             () ->
                                     MetricUtils.instantiateJobManagerMetricGroup(
                                             metricRegistry, hostname),
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             fatalErrorHandler,
                             historyServerArchivist,
                             metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index 7182f3f..8cddd19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -41,7 +41,7 @@ public interface DispatcherResourceManagerComponentFactory {
             BlobServer blobServer,
             HeartbeatServices heartbeatServices,
             MetricRegistry metricRegistry,
-            ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
             FatalErrorHandler fatalErrorHandler)
             throws Exception;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 01fb137..f6d820f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 /** Interface for completion actions once a Flink job has reached a terminal state. */
 public interface OnCompletionActions {
@@ -27,9 +27,9 @@ public interface OnCompletionActions {
     /**
      * Job reached a globally terminal state.
      *
-     * @param executionGraph serializable execution graph
+     * @param executionGraphInfo contains information about the terminated job
      */
-    void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph);
+    void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo);
 
     /** Job was finished by another JobMaster. */
     void jobFinishedByOther();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index 629326c..042945d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -241,10 +241,10 @@ public class JobManagerRunnerImpl
 
     /** Job completion notification triggered by JobManager. */
     @Override
-    public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+    public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
         unregisterJobFromHighAvailability();
-        // complete the result future with the terminal execution graph
-        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraph));
+        // complete the result future with the information of the information of the terminated job
+        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
     }
 
     /** Job completion notification triggered by self. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
index ffda268..53542ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -28,37 +28,37 @@ import java.util.Objects;
 /** The result of the {@link JobManagerRunner}. */
 public final class JobManagerRunnerResult {
 
-    @Nullable private final ArchivedExecutionGraph archivedExecutionGraph;
+    @Nullable private final ExecutionGraphInfo executionGraphInfo;
 
     @Nullable private final Throwable failure;
 
     private JobManagerRunnerResult(
-            @Nullable ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable failure) {
-        this.archivedExecutionGraph = archivedExecutionGraph;
+            @Nullable ExecutionGraphInfo executionGraphInfo, @Nullable Throwable failure) {
+        this.executionGraphInfo = executionGraphInfo;
         this.failure = failure;
     }
 
     public boolean isSuccess() {
-        return archivedExecutionGraph != null && failure == null;
+        return executionGraphInfo != null && failure == null;
     }
 
     public boolean isJobNotFinished() {
-        return archivedExecutionGraph == null && failure == null;
+        return executionGraphInfo == null && failure == null;
     }
 
     public boolean isInitializationFailure() {
-        return archivedExecutionGraph == null && failure != null;
+        return executionGraphInfo == null && failure != null;
     }
 
     /**
      * This method returns the payload of the successful JobManagerRunnerResult.
      *
-     * @return the successful completed {@link ArchivedExecutionGraph}
+     * @return the @link ExecutionGraphInfo} of a successfully finished job
      * @throws IllegalStateException if the result is not a success
      */
-    public ArchivedExecutionGraph getArchivedExecutionGraph() {
+    public ExecutionGraphInfo getExecutionGraphInfo() {
         Preconditions.checkState(isSuccess());
-        return archivedExecutionGraph;
+        return executionGraphInfo;
     }
 
     /**
@@ -81,21 +81,21 @@ public final class JobManagerRunnerResult {
             return false;
         }
         JobManagerRunnerResult that = (JobManagerRunnerResult) o;
-        return Objects.equals(archivedExecutionGraph, that.archivedExecutionGraph)
+        return Objects.equals(executionGraphInfo, that.executionGraphInfo)
                 && Objects.equals(failure, that.failure);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(archivedExecutionGraph, failure);
+        return Objects.hash(executionGraphInfo, failure);
     }
 
     public static JobManagerRunnerResult forJobNotFinished() {
         return new JobManagerRunnerResult(null, null);
     }
 
-    public static JobManagerRunnerResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
-        return new JobManagerRunnerResult(archivedExecutionGraph, null);
+    public static JobManagerRunnerResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+        return new JobManagerRunnerResult(executionGraphInfo, null);
     }
 
     public static JobManagerRunnerResult forInitializationFailure(Throwable failure) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c353258..483b06a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
@@ -74,6 +73,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -766,7 +766,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         return CompletableFuture.completedFuture(schedulerNG.requestJob());
     }
 
@@ -975,11 +975,9 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
                                                     : partitionTracker
                                                             ::stopTrackingAndReleasePartitionsFor));
 
-            final ArchivedExecutionGraph archivedExecutionGraph = schedulerNG.requestJob();
+            final ExecutionGraphInfo executionGraphInfo = schedulerNG.requestJob();
             scheduledExecutorService.execute(
-                    () ->
-                            jobCompletionActions.jobReachedGloballyTerminalState(
-                                    archivedExecutionGraph));
+                    () -> jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo));
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 05a67e6..f742eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -209,12 +209,12 @@ public interface JobMasterGateway
     CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
 
     /**
-     * Requests the {@link ArchivedExecutionGraph} of the executed job.
+     * Requests the {@link ExecutionGraphInfo} of the executed job.
      *
      * @param timeout for the rpc call
-     * @return Future which is completed with the {@link ArchivedExecutionGraph} of the executed job
+     * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job
      */
-    CompletableFuture<ArchivedExecutionGraph> requestJob(@RpcTimeout Time timeout);
+    CompletableFuture<ExecutionGraphInfo> requestJob(@RpcTimeout Time timeout);
 
     /**
      * Triggers taking a savepoint of the executed job.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 41aad53..e1753bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -468,7 +468,7 @@ public class MiniCluster implements AutoCloseableAsync {
                         blobServer,
                         heartbeatServices,
                         metricRegistry,
-                        new MemoryArchivedExecutionGraphStore(),
+                        new MemoryExecutionGraphInfoStore(),
                         metricQueryServiceRetriever,
                         fatalErrorHandler));
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
similarity index 53%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
index f423e2e..f8d2aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
@@ -1,4 +1,3 @@
-package org.apache.flink.runtime.rest.handler.job;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,63 +16,59 @@ package org.apache.flink.runtime.rest.handler.job;
  * limitations under the License.
  */
 
+package org.apache.flink.runtime.rest.handler.job;
+
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
-/** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
-        implements JsonArchivist {
+/**
+ * {@code AbstractAccessExecutionGraphHandler} handles requests that require accessing the job's
+ * {@link AccessExecutionGraph}.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameter type
+ */
+public abstract class AbstractAccessExecutionGraphHandler<
+                R extends ResponseBody, M extends JobMessageParameters>
+        extends AbstractExecutionGraphHandler<R, M> {
 
-    public JobPlanHandler(
+    protected AbstractAccessExecutionGraphHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
-            Map<String, String> headers,
-            MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
             ExecutionGraphCache executionGraphCache,
             Executor executor) {
-
-        super(leaderRetriever, timeout, headers, messageHeaders, executionGraphCache, executor);
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
     }
 
     @Override
-    protected JobPlanInfo handleRequest(
-            HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
-            AccessExecutionGraph executionGraph)
+    protected R handleRequest(
+            HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
             throws RestHandlerException {
-        return createJobPlanInfo(executionGraph);
+        return handleRequest(request, executionGraphInfo.getArchivedExecutionGraph());
     }
 
-    @Override
-    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
-            throws IOException {
-        ResponseBody json = createJobPlanInfo(graph);
-        String path =
-                getMessageHeaders()
-                        .getTargetRestEndpointURL()
-                        .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
-        return Collections.singleton(new ArchivedJson(path, json));
-    }
-
-    private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) {
-        return new JobPlanInfo(executionGraph.getJsonPlan());
-    }
+    protected abstract R handleRequest(
+            HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+            throws RestHandlerException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 8c940ec..e8910a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -45,9 +45,10 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
- * Base class for all {@link AccessExecutionGraph} based REST handlers.
+ * Base class for all {@link ExecutionGraphInfo} based REST handlers.
  *
  * @param <R> response type
+ * @param <M> job message parameter type
  */
 public abstract class AbstractExecutionGraphHandler<
                 R extends ResponseBody, M extends JobMessageParameters>
@@ -76,8 +77,8 @@ public abstract class AbstractExecutionGraphHandler<
             throws RestHandlerException {
         JobID jobId = request.getPathParameter(JobIDPathParameter.class);
 
-        CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                executionGraphCache.getExecutionGraph(jobId, gateway);
+        CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+                executionGraphCache.getExecutionGraphInfo(jobId, gateway);
 
         return executionGraphFuture
                 .thenApplyAsync(
@@ -104,15 +105,15 @@ public abstract class AbstractExecutionGraphHandler<
     }
 
     /**
-     * Called for each request after the corresponding {@link AccessExecutionGraph} has been
-     * retrieved from the {@link ExecutionGraphCache}.
+     * Called for each request after the corresponding {@link ExecutionGraphInfo} has been retrieved
+     * from the {@link ExecutionGraphCache}.
      *
      * @param request for further information
-     * @param executionGraph for which the handler was called
+     * @param executionGraphInfo for which the handler was called
      * @return Response
      * @throws RestHandlerException if the handler could not process the request
      */
     protected abstract R handleRequest(
-            HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+            HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
             throws RestHandlerException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
index ee09d02..1e001ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractJobVertexHandler<
                 R extends ResponseBody, M extends JobVertexMessageParameters>
-        extends AbstractExecutionGraphHandler<R, M> {
+        extends AbstractAccessExecutionGraphHandler<R, M> {
 
     /**
      * Instantiates a new Abstract job vertex handler.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 9c41953..70dd6ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -48,7 +48,8 @@ import java.util.concurrent.Executor;
 
 /** Request handler that returns the aggregated accumulators of a job. */
 public class JobAccumulatorsHandler
-        extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
         implements JsonArchivist {
 
     public JobAccumulatorsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index cbdc8b6..53ee2b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
 
 /** Handler serving the job configuration. */
 public class JobConfigHandler
-        extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public JobConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 9261674..84c6d55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
 
 /** Handler returning the details for the specified job. */
 public class JobDetailsHandler
-        extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
         implements JsonArchivist {
 
     private final MetricFetcher metricFetcher;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 819f727..1fa496e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -49,7 +49,8 @@ import java.util.concurrent.Executor;
 
 /** Handler serving the job exceptions. */
 public class JobExceptionsHandler
-        extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobExceptionsMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobExceptionsInfo, JobExceptionsMessageParameters>
         implements JsonArchivist {
 
     static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index f423e2e..fcf48ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -40,7 +40,8 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 /** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
+public class JobPlanHandler
+        extends AbstractAccessExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public JobPlanHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 1ab070f..12e8542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -53,7 +53,8 @@ import java.util.concurrent.Executor;
 
 /** Request handler for the job vertex details. */
 public class JobVertexDetailsHandler
-        extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobVertexDetailsInfo, JobVertexMessageParameters>
         implements JsonArchivist {
     private final MetricFetcher metricFetcher;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 0408fa1..1e38e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,8 @@ import java.util.concurrent.Executor;
  * and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler
-        extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobVertexTaskManagersInfo, JobVertexMessageParameters>
         implements JsonArchivist {
     private MetricFetcher metricFetcher;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 9f2763c..1e2817e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -47,7 +47,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractCheckpointHandler<
                 R extends ResponseBody, M extends CheckpointMessageParameters>
-        extends AbstractExecutionGraphHandler<R, M> {
+        extends AbstractAccessExecutionGraphHandler<R, M> {
 
     private final CheckpointStatsCache checkpointStatsCache;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 597bcb5..7684271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
 
 /** Handler which serves the checkpoint configuration. */
 public class CheckpointConfigHandler
-        extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public CheckpointConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index 320f7e0..db6f79c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
 
 /** Handler which serves the checkpoint statistics. */
 public class CheckpointingStatisticsHandler
-        extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
         implements JsonArchivist {
 
     public CheckpointingStatisticsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index 3ed7e4d..de40e14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -20,8 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -63,12 +62,12 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity());
     }
 
-    private CompletableFuture<ArchivedExecutionGraph> getExecutionGraphInternal(
+    private CompletableFuture<ExecutionGraphInfo> getExecutionGraphInternal(
             JobID jobId, RestfulGateway restfulGateway) {
         Preconditions.checkState(running, "ExecutionGraphCache is no longer running");
 
@@ -78,10 +77,10 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
             final long currentTime = System.currentTimeMillis();
 
             if (oldEntry != null && currentTime < oldEntry.getTTL()) {
-                final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
-                        oldEntry.getExecutionGraphFuture();
-                if (!executionGraphFuture.isCompletedExceptionally()) {
-                    return executionGraphFuture;
+                final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                        oldEntry.getExecutionGraphInfoFuture();
+                if (!executionGraphInfoFuture.isCompletedExceptionally()) {
+                    return executionGraphInfoFuture;
                 }
                 // otherwise it must be completed exceptionally
             }
@@ -96,22 +95,23 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
             } else {
                 successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
                 // cancel potentially outstanding futures
-                oldEntry.getExecutionGraphFuture().cancel(false);
+                oldEntry.getExecutionGraphInfoFuture().cancel(false);
             }
 
             if (successfulUpdate) {
-                final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
-                        restfulGateway.requestJob(jobId, timeout);
+                final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                        restfulGateway.requestExecutionGraphInfo(jobId, timeout);
 
-                executionGraphFuture.whenComplete(
-                        (ArchivedExecutionGraph executionGraph, Throwable throwable) -> {
+                executionGraphInfoFuture.whenComplete(
+                        (ExecutionGraphInfo executionGraph, Throwable throwable) -> {
                             if (throwable != null) {
-                                newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
+                                newEntry.getExecutionGraphInfoFuture()
+                                        .completeExceptionally(throwable);
 
                                 // remove exceptionally completed entry because it doesn't help
                                 cachedExecutionGraphs.remove(jobId, newEntry);
                             } else {
-                                newEntry.getExecutionGraphFuture().complete(executionGraph);
+                                newEntry.getExecutionGraphInfoFuture().complete(executionGraph);
                             }
                         });
 
@@ -120,7 +120,7 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
                     cachedExecutionGraphs.remove(jobId, newEntry);
                 }
 
-                return newEntry.getExecutionGraphFuture();
+                return newEntry.getExecutionGraphInfoFuture();
             }
         }
     }
@@ -139,19 +139,19 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
     private static final class ExecutionGraphEntry {
         private final long ttl;
 
-        private final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture;
+        private final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture;
 
         ExecutionGraphEntry(long ttl) {
             this.ttl = ttl;
-            this.executionGraphFuture = new CompletableFuture<>();
+            this.executionGraphInfoFuture = new CompletableFuture<>();
         }
 
         public long getTTL() {
             return ttl;
         }
 
-        public CompletableFuture<ArchivedExecutionGraph> getExecutionGraphFuture() {
-            return executionGraphFuture;
+        public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfoFuture() {
+            return executionGraphInfoFuture;
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 8309f27..b970399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -19,17 +19,16 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache
- * entry has an associated time to live after which a new request will trigger the reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Cache for {@link ExecutionGraphInfo} which are obtained from the Flink cluster. Every cache entry
+ * has an associated time to live after which a new request will trigger the reloading of the {@link
+ * ExecutionGraphInfo} from the cluster.
  */
 public interface ExecutionGraphCache extends Closeable {
 
@@ -37,15 +36,15 @@ public interface ExecutionGraphCache extends Closeable {
     int size();
 
     /**
-     * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The {@link
-     * AccessExecutionGraph} will be requested again after the refresh interval has passed or if the
+     * Gets the {@link ExecutionGraphInfo} for the given {@link JobID} and caches it. The {@link
+     * ExecutionGraphInfo} will be requested again after the refresh interval has passed or if the
      * graph could not be retrieved from the given gateway.
      *
-     * @param jobId identifying the {@link ArchivedExecutionGraph} to get
-     * @param restfulGateway to request the {@link ArchivedExecutionGraph} from
-     * @return Future containing the requested {@link ArchivedExecutionGraph}
+     * @param jobId identifying the {@link ExecutionGraphInfo} to get
+     * @param restfulGateway to request the {@link ExecutionGraphInfo} from
+     * @return Future containing the requested {@link ExecutionGraphInfo}
      */
-    CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway);
 
     /** Perform the cleanup of out dated cache entries. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
new file mode 100644
index 0000000..4b7b746
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExecutionGraphInfo} serves as a composite class that provides different {@link
+ * ExecutionGraph}-related information.
+ */
+public class ExecutionGraphInfo implements Serializable {
+
+    private static final long serialVersionUID = -6134203195124124202L;
+
+    private final ArchivedExecutionGraph executionGraph;
+    private final List<ErrorInfo> exceptionHistory;
+
+    public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
+        this(executionGraph, Collections.emptyList());
+    }
+
+    public ExecutionGraphInfo(
+            ArchivedExecutionGraph executionGraph, List<ErrorInfo> exceptionHistory) {
+        this.executionGraph = executionGraph;
+        this.exceptionHistory = exceptionHistory;
+    }
+
+    public JobID getJobId() {
+        return executionGraph.getJobID();
+    }
+
+    public ArchivedExecutionGraph getArchivedExecutionGraph() {
+        return executionGraph;
+    }
+
+    public List<ErrorInfo> getExceptionHistory() {
+        return exceptionHistory;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 64070ed..9b1e83d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -763,9 +763,10 @@ public abstract class SchedulerBase implements SchedulerNG {
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
+    public ExecutionGraphInfo requestJob() {
         mainThreadExecutor.assertRunningInMainThread();
-        return ArchivedExecutionGraph.createFrom(executionGraph);
+        return new ExecutionGraphInfo(
+                ArchivedExecutionGraph.createFrom(executionGraph), getExceptionHistory());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5db4c5d..c88c8c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -92,7 +91,7 @@ public interface SchedulerNG {
 
     void notifyPartitionDataAvailable(ResultPartitionID partitionID);
 
-    ArchivedExecutionGraph requestJob();
+    ExecutionGraphInfo requestJob();
 
     JobStatus requestJobStatus();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index fdd1a5b..7a332cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerUtils;
@@ -368,8 +369,9 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
-        return state.getJob();
+    public ExecutionGraphInfo requestJob() {
+        // no exception history support is added for now (see FLINK-21439)
+        return new ExecutionGraphInfo(state.getJob());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 6a1d317..62b5f76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collection;
@@ -67,7 +68,24 @@ public interface RestfulGateway extends RpcGateway {
      * @return Future containing the {@link ArchivedExecutionGraph} for the given jobId, otherwise
      *     {@link FlinkJobNotFoundException}
      */
-    CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
+    default CompletableFuture<ArchivedExecutionGraph> requestJob(
+            JobID jobId, @RpcTimeout Time timeout) {
+        return requestExecutionGraphInfo(jobId, timeout)
+                .thenApply(ExecutionGraphInfo::getArchivedExecutionGraph);
+    }
+
+    /**
+     * Requests the {@link ExecutionGraphInfo} containing additional information besides the {@link
+     * ArchivedExecutionGraph}. If there is no such graph, then the future is completed with a
+     * {@link FlinkJobNotFoundException}.
+     *
+     * @param jobId identifying the job whose {@link ExecutionGraphInfo} is requested
+     * @param timeout for the asynchronous operation
+     * @return Future containing the {@link ExecutionGraphInfo} for the given jobId, otherwise
+     *     {@link FlinkJobNotFoundException}
+     */
+    CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, @RpcTimeout Time timeout);
 
     /**
      * Requests the {@link JobResult} of a job specified by the given jobId.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
index a2edd76..e1da5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.history;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -43,4 +44,23 @@ public interface JsonArchivist {
      * @throws IOException thrown if the JSON generation fails
      */
     Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+
+    /**
+     * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
+     * respective REST URL for a given job.
+     *
+     * <p>The collection should contain one entry for every response that could be generated for the
+     * given job, for example one entry for each task. The REST URLs should be unique and must not
+     * contain placeholders.
+     *
+     * @param executionGraphInfo {@link AccessExecutionGraph}-related information for which the
+     *     responses should be generated
+     * @return Collection containing an ArchivedJson for every response that could be generated for
+     *     the given job
+     * @throws IOException thrown if the JSON generation fails
+     */
+    default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
+            throws IOException {
+        return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
index 9738809..66b8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -90,7 +91,9 @@ public class DispatcherJobTest extends TestLogger {
         // assert result future done
         DispatcherJobResult result = dispatcherJob.getResultFuture().get();
 
-        assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FINISHED));
+        assertThat(
+                result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+                is(JobStatus.FINISHED));
     }
 
     @Test
@@ -116,7 +119,12 @@ public class DispatcherJobTest extends TestLogger {
         assertThat(dispatcherJob.isInitialized(), is(true));
         // assert that the result future completes
         assertThat(
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.CANCELED));
     }
 
@@ -134,7 +142,12 @@ public class DispatcherJobTest extends TestLogger {
         cancelFuture.get();
         assertJobStatus(dispatcherJob, JobStatus.CANCELED);
         assertThat(
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.CANCELED));
     }
 
@@ -171,7 +184,11 @@ public class DispatcherJobTest extends TestLogger {
         assertJobStatus(dispatcherJob, JobStatus.FAILED);
 
         ArchivedExecutionGraph aeg =
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph();
         assertThat(
                 aeg.getFailureInfo()
                         .getException()
@@ -188,7 +205,9 @@ public class DispatcherJobTest extends TestLogger {
 
         DispatcherJobResult result = dispatcherJob.getResultFuture().get();
         assertThat(result.isInitializationFailure(), is(true));
-        assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FAILED));
+        assertThat(
+                result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+                is(JobStatus.FAILED));
         assertThat(
                 result.getInitializationFailure().getMessage(),
                 containsString("Artificial failure"));
@@ -298,13 +317,14 @@ public class DispatcherJobTest extends TestLogger {
                             .setRequestJobSupplier(
                                     () ->
                                             CompletableFuture.completedFuture(
-                                                    ArchivedExecutionGraph
-                                                            .createFromInitializingJob(
-                                                                    getJobID(),
-                                                                    "test",
-                                                                    internalJobStatus,
-                                                                    null,
-                                                                    1337)))
+                                                    new ExecutionGraphInfo(
+                                                            ArchivedExecutionGraph
+                                                                    .createFromInitializingJob(
+                                                                            getJobID(),
+                                                                            "test",
+                                                                            internalJobStatus,
+                                                                            null,
+                                                                            1337))))
                             .setRequestJobDetailsSupplier(
                                     () -> {
                                         JobDetails jobDetails =
@@ -359,8 +379,9 @@ public class DispatcherJobTest extends TestLogger {
             internalJobStatus = JobStatus.FINISHED;
             resultFuture.complete(
                     JobManagerRunnerResult.forSuccess(
-                            ArchivedExecutionGraph.createFromInitializingJob(
-                                    getJobID(), "test", JobStatus.FINISHED, null, 1337)));
+                            new ExecutionGraphInfo(
+                                    ArchivedExecutionGraph.createFromInitializingJob(
+                                            getJobID(), "test", JobStatus.FINISHED, null, 1337))));
         }
 
         public void finishCancellation() {
@@ -370,12 +391,14 @@ public class DispatcherJobTest extends TestLogger {
                         runner.getResultFuture()
                                 .complete(
                                         JobManagerRunnerResult.forSuccess(
-                                                ArchivedExecutionGraph.createFromInitializingJob(
-                                                        getJobID(),
-                                                        "test",
-                                                        JobStatus.CANCELED,
-                                                        null,
-                                                        1337)));
+                                                new ExecutionGraphInfo(
+                                                        ArchivedExecutionGraph
+                                                                .createFromInitializingJob(
+                                                                        getJobID(),
+                                                                        "test",
+                                                                        JobStatus.CANCELED,
+                                                                        null,
+                                                                        1337))));
                         cancellationFuture.complete(Acknowledge.get());
                     });
         }
@@ -384,7 +407,9 @@ public class DispatcherJobTest extends TestLogger {
     private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus)
             throws Exception {
         assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus));
-        assertThat(dispatcherJob.requestJob(TIMEOUT).get().getState(), is(expectedStatus));
+        assertThat(
+                dispatcherJob.requestJob(TIMEOUT).get().getArchivedExecutionGraph().getState(),
+                is(expectedStatus));
         assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index fb02a7b..caa1574 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -196,8 +197,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
         TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
         final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-        final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
-                new MemoryArchivedExecutionGraphStore();
+        final MemoryExecutionGraphInfoStore archivedExecutionGraphStore =
+                new MemoryExecutionGraphInfoStore();
         dispatcher =
                 new TestingDispatcher(
                         rpcService,
@@ -349,10 +350,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         testingJobManagerRunner.getCloseAsyncCalledLatch().await();
         testingJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.FINISHED)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobId)
+                                .setState(JobStatus.FINISHED)
+                                .build()));
 
         testingJobManagerRunner.completeTerminationFuture();
 
@@ -378,10 +380,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
         testingJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setState(JobStatus.FINISHED)
-                        .setJobID(jobId)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setState(JobStatus.FINISHED)
+                                .setJobID(jobId)
+                                .build()));
 
         // wait for the clearing
         clearedJobLatch.await();
@@ -459,10 +462,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
     private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
         takeCreatedJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.FINISHED)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobId)
+                                .setState(JobStatus.FINISHED)
+                                .build()));
     }
 
     private void assertThatHABlobsHaveNotBeenRemoved() {
@@ -578,7 +582,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(executionGraph);
+        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
 
         assertThat(cleanupJobFuture.get(), equalTo(jobId));
         assertThat(deleteAllHABlobsFuture.isDone(), is(false));
@@ -597,7 +601,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(executionGraph);
+        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
 
         assertThat(cleanupJobFuture.get(), equalTo(jobId));
         assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 11e799e..b4c2cd8 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
@@ -264,8 +265,8 @@ public class DispatcherTest extends TestLogger {
             TestingResourceManagerGateway resourceManagerGateway =
                     new TestingResourceManagerGateway();
 
-            final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
-                    new MemoryArchivedExecutionGraphStore();
+            final MemoryExecutionGraphInfoStore executionGraphInfoStore =
+                    new MemoryExecutionGraphInfoStore();
 
             return new TestingDispatcher(
                     rpcService,
@@ -278,7 +279,7 @@ public class DispatcherTest extends TestLogger {
                             () -> CompletableFuture.completedFuture(resourceManagerGateway),
                             blobServer,
                             heartbeatServices,
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             testingFatalErrorHandlerResource.getFatalErrorHandler(),
                             VoidHistoryServerArchivist.INSTANCE,
                             null,
@@ -530,21 +531,23 @@ public class DispatcherTest extends TestLogger {
         final JobID failedJobId = new JobID();
 
         final JobStatus expectedState = JobStatus.FAILED;
-        final ArchivedExecutionGraph failedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(failedJobId)
-                        .setState(expectedState)
-                        .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L))
-                        .build();
+        final ExecutionGraphInfo failedExecutionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(failedJobId)
+                                .setState(expectedState)
+                                .setFailureCause(
+                                        new ErrorInfo(new RuntimeException("expected"), 1L))
+                                .build());
 
-        dispatcher.completeJobExecution(failedExecutionGraph);
+        dispatcher.completeJobExecution(failedExecutionGraphInfo);
 
         assertThat(
                 dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
                 equalTo(expectedState));
         assertThat(
-                dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(),
-                equalTo(failedExecutionGraph));
+                dispatcherGateway.requestExecutionGraphInfo(failedJobId, TIMEOUT).get(),
+                equalTo(failedExecutionGraphInfo));
     }
 
     @Test
@@ -970,13 +973,14 @@ public class DispatcherTest extends TestLogger {
                             .setRequestJobSupplier(
                                     () ->
                                             CompletableFuture.completedFuture(
-                                                    ArchivedExecutionGraph
-                                                            .createFromInitializingJob(
-                                                                    jobGraph.getJobID(),
-                                                                    jobGraph.getName(),
-                                                                    JobStatus.RUNNING,
-                                                                    null,
-                                                                    1337)))
+                                                    new ExecutionGraphInfo(
+                                                            ArchivedExecutionGraph
+                                                                    .createFromInitializingJob(
+                                                                            jobGraph.getJobID(),
+                                                                            jobGraph.getName(),
+                                                                            JobStatus.RUNNING,
+                                                                            null,
+                                                                            1337))))
                             .build();
             testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
             return testingRunner;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
similarity index 50%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
index 42e0325..cfd76a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.ManualTicker;
 import org.apache.flink.util.Preconditions;
@@ -57,8 +58,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-/** Tests for the {@link FileArchivedExecutionGraphStore}. */
-public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+/** Tests for the {@link FileExecutionGraphInfoStore}. */
+public class FileExecutionGraphInfoStoreTest extends TestLogger {
 
     private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS =
             Arrays.stream(JobStatus.values())
@@ -68,31 +69,32 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
     /**
-     * Tests that we can put {@link ArchivedExecutionGraph} into the {@link
-     * FileArchivedExecutionGraphStore} and that the graph is persisted.
+     * Tests that we can put {@link ExecutionGraphInfo} into the {@link FileExecutionGraphInfoStore}
+     * and that the graph is persisted.
      */
     @Test
     public void testPut() throws IOException {
-        final ArchivedExecutionGraph dummyExecutionGraph =
-                new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+        final ExecutionGraphInfo dummyExecutionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
 
             final File storageDirectory = executionGraphStore.getStorageDir();
 
             // check that the storage directory is empty
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
 
-            executionGraphStore.put(dummyExecutionGraph);
+            executionGraphStore.put(dummyExecutionGraphInfo);
 
             // check that we have persisted the given execution graph
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
 
             assertThat(
-                    executionGraphStore.get(dummyExecutionGraph.getJobID()),
-                    new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+                    executionGraphStore.get(dummyExecutionGraphInfo.getJobId()),
+                    new PartialExecutionGraphInfoMatcher(dummyExecutionGraphInfo));
         }
     }
 
@@ -101,8 +103,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     public void testUnknownGet() throws IOException {
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
             assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
         }
     }
@@ -111,11 +113,12 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @Test
     public void testStoredJobsOverview() throws IOException {
         final int numberExecutionGraphs = 10;
-        final Collection<ArchivedExecutionGraph> executionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> executionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
         final List<JobStatus> jobStatuses =
-                executionGraphs.stream()
+                executionGraphInfos.stream()
+                        .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                         .map(ArchivedExecutionGraph::getState)
                         .collect(Collectors.toList());
 
@@ -123,14 +126,14 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
-                executionGraphStore.put(executionGraph);
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
             }
 
             assertThat(
-                    executionGraphStore.getStoredJobsOverview(),
+                    executionGraphInfoStore.getStoredJobsOverview(),
                     Matchers.equalTo(expectedJobsOverview));
         }
     }
@@ -139,21 +142,21 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @Test
     public void testAvailableJobDetails() throws IOException {
         final int numberExecutionGraphs = 10;
-        final Collection<ArchivedExecutionGraph> executionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> executionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
-        final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphs);
+        final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphInfos);
 
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
-                executionGraphStore.put(executionGraph);
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
             }
 
             assertThat(
-                    executionGraphStore.getAvailableJobDetails(),
+                    executionGraphInfoStore.getAvailableJobDetails(),
                     Matchers.containsInAnyOrder(jobDetails.toArray()));
         }
     }
@@ -170,8 +173,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         final ManualTicker manualTicker = new ManualTicker();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         expirationTime,
                         Integer.MAX_VALUE,
@@ -179,24 +182,29 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         scheduledExecutor,
                         manualTicker)) {
 
-            final ArchivedExecutionGraph executionGraph =
-                    new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+            final ExecutionGraphInfo executionGraphInfo =
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setState(JobStatus.FINISHED)
+                                    .build());
 
-            executionGraphStore.put(executionGraph);
+            executionGraphInfoStore.put(executionGraphInfo);
 
             // there should one execution graph
-            assertThat(executionGraphStore.size(), Matchers.equalTo(1));
+            assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
 
             manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS);
 
             // this should trigger the cleanup after expiration
             scheduledExecutor.triggerScheduledTasks();
 
-            assertThat(executionGraphStore.size(), Matchers.equalTo(0));
+            assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
 
-            assertThat(executionGraphStore.get(executionGraph.getJobID()), Matchers.nullValue());
+            assertThat(
+                    executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+                    Matchers.nullValue());
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             // check that the persisted file has been deleted
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
@@ -210,17 +218,20 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
 
             assertThat(rootDir.listFiles().length, Matchers.equalTo(1));
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
 
-            executionGraphStore.put(
-                    new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
+            executionGraphInfoStore.put(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setState(JobStatus.FINISHED)
+                                    .build()));
 
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
         }
@@ -228,13 +239,13 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
         assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
     }
 
-    /** Tests that evicted {@link ArchivedExecutionGraph} are loaded from disk again. */
+    /** Tests that evicted {@link ExecutionGraphInfo} are loaded from disk again. */
     @Test
     public void testCacheLoading() throws IOException {
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         Time.hours(1L),
                         Integer.MAX_VALUE,
@@ -242,43 +253,47 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         TestingUtils.defaultScheduledExecutor(),
                         Ticker.systemTicker())) {
 
-            final LoadingCache<JobID, ArchivedExecutionGraph> executionGraphCache =
-                    executionGraphStore.getArchivedExecutionGraphCache();
+            final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache =
+                    executionGraphInfoStore.getExecutionGraphInfoCache();
 
-            Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(64);
+            Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(64);
 
             boolean continueInserting = true;
 
             // insert execution graphs until the first one got evicted
             while (continueInserting) {
                 // has roughly a size of 1.4 KB
-                final ArchivedExecutionGraph executionGraph =
-                        new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+                final ExecutionGraphInfo executionGraphInfo =
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setState(JobStatus.FINISHED)
+                                        .build());
 
-                executionGraphStore.put(executionGraph);
+                executionGraphInfoStore.put(executionGraphInfo);
 
-                executionGraphs.add(executionGraph);
+                executionGraphInfos.add(executionGraphInfo);
 
-                continueInserting = executionGraphCache.size() == executionGraphs.size();
+                continueInserting = executionGraphInfoCache.size() == executionGraphInfos.size();
             }
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             assertThat(
-                    storageDirectory.listFiles().length, Matchers.equalTo(executionGraphs.size()));
+                    storageDirectory.listFiles().length,
+                    Matchers.equalTo(executionGraphInfos.size()));
 
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
                 assertThat(
-                        executionGraphStore.get(executionGraph.getJobID()),
-                        matchesPartiallyWith(executionGraph));
+                        executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+                        matchesPartiallyWith(executionGraphInfo));
             }
         }
     }
 
     /**
-     * Tests that the size of {@link FileArchivedExecutionGraphStore} is no more than the configured
-     * max capacity and the old execution graphs will be purged if the total added number exceeds
-     * the max capacity.
+     * Tests that the size of {@link FileExecutionGraphInfoStore} is no more than the configured max
+     * capacity and the old execution graphs will be purged if the total added number exceeds the
+     * max capacity.
      */
     @Test
     public void testMaximumCapacity() throws IOException {
@@ -287,15 +302,15 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
         final int maxCapacity = 10;
         final int numberExecutionGraphs = 10;
 
-        final Collection<ArchivedExecutionGraph> oldExecutionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
-        final Collection<ArchivedExecutionGraph> newExecutionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> oldExecutionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> newExecutionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
-        final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphs);
+        final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphInfos);
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         Time.hours(1L),
                         maxCapacity,
@@ -303,42 +318,44 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         TestingUtils.defaultScheduledExecutor(),
                         Ticker.systemTicker())) {
 
-            for (ArchivedExecutionGraph executionGraph : oldExecutionGraphs) {
-                executionGraphStore.put(executionGraph);
+            for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
                 // no more than the configured maximum capacity
-                assertTrue(executionGraphStore.size() <= maxCapacity);
+                assertTrue(executionGraphInfoStore.size() <= maxCapacity);
             }
 
-            for (ArchivedExecutionGraph executionGraph : newExecutionGraphs) {
-                executionGraphStore.put(executionGraph);
+            for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
                 // equals to the configured maximum capacity
-                assertEquals(maxCapacity, executionGraphStore.size());
+                assertEquals(maxCapacity, executionGraphInfoStore.size());
             }
 
             // the older execution graphs are purged
             assertThat(
-                    executionGraphStore.getAvailableJobDetails(),
+                    executionGraphInfoStore.getAvailableJobDetails(),
                     Matchers.containsInAnyOrder(jobDetails.toArray()));
         }
     }
 
-    private Collection<ArchivedExecutionGraph> generateTerminalExecutionGraphs(int number) {
-        final Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(number);
+    private Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos(int number) {
+        final Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(number);
 
         for (int i = 0; i < number; i++) {
             final JobStatus state =
                     GLOBALLY_TERMINAL_JOB_STATUS.get(
                             ThreadLocalRandom.current()
                                     .nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size()));
-            executionGraphs.add(new ArchivedExecutionGraphBuilder().setState(state).build());
+            executionGraphInfos.add(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder().setState(state).build()));
         }
 
-        return executionGraphs;
+        return executionGraphInfos;
     }
 
-    private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File storageDirectory)
+    private FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore(File storageDirectory)
             throws IOException {
-        return new FileArchivedExecutionGraphStore(
+        return new FileExecutionGraphInfoStore(
                 storageDirectory,
                 Time.hours(1L),
                 Integer.MAX_VALUE,
@@ -347,56 +364,65 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                 Ticker.systemTicker());
     }
 
-    private static final class PartialArchivedExecutionGraphMatcher
-            extends BaseMatcher<ArchivedExecutionGraph> {
+    private static final class PartialExecutionGraphInfoMatcher
+            extends BaseMatcher<ExecutionGraphInfo> {
 
-        private final ArchivedExecutionGraph archivedExecutionGraph;
+        private final ExecutionGraphInfo expectedExecutionGraphInfo;
 
-        private PartialArchivedExecutionGraphMatcher(
-                ArchivedExecutionGraph expectedArchivedExecutionGraph) {
-            this.archivedExecutionGraph =
-                    Preconditions.checkNotNull(expectedArchivedExecutionGraph);
+        private PartialExecutionGraphInfoMatcher(ExecutionGraphInfo expectedExecutionGraphInfo) {
+            this.expectedExecutionGraphInfo =
+                    Preconditions.checkNotNull(expectedExecutionGraphInfo);
         }
 
         @Override
         public boolean matches(Object o) {
-            if (archivedExecutionGraph == o) {
+            if (expectedExecutionGraphInfo == o) {
                 return true;
             }
-            if (o == null || archivedExecutionGraph.getClass() != o.getClass()) {
+            if (o == null || expectedExecutionGraphInfo.getClass() != o.getClass()) {
                 return false;
             }
-            ArchivedExecutionGraph that = (ArchivedExecutionGraph) o;
-            return archivedExecutionGraph.isStoppable() == that.isStoppable()
-                    && Objects.equals(archivedExecutionGraph.getJobID(), that.getJobID())
-                    && Objects.equals(archivedExecutionGraph.getJobName(), that.getJobName())
-                    && archivedExecutionGraph.getState() == that.getState()
-                    && Objects.equals(archivedExecutionGraph.getJsonPlan(), that.getJsonPlan())
+            ExecutionGraphInfo that = (ExecutionGraphInfo) o;
+
+            ArchivedExecutionGraph thisExecutionGraph =
+                    expectedExecutionGraphInfo.getArchivedExecutionGraph();
+            ArchivedExecutionGraph thatExecutionGraph = that.getArchivedExecutionGraph();
+            return thisExecutionGraph.isStoppable() == thatExecutionGraph.isStoppable()
+                    && Objects.equals(thisExecutionGraph.getJobID(), thatExecutionGraph.getJobID())
+                    && Objects.equals(
+                            thisExecutionGraph.getJobName(), thatExecutionGraph.getJobName())
+                    && thisExecutionGraph.getState() == thatExecutionGraph.getState()
+                    && Objects.equals(
+                            thisExecutionGraph.getJsonPlan(), thatExecutionGraph.getJsonPlan())
+                    && Objects.equals(
+                            thisExecutionGraph.getAccumulatorsSerialized(),
+                            thatExecutionGraph.getAccumulatorsSerialized())
                     && Objects.equals(
-                            archivedExecutionGraph.getAccumulatorsSerialized(),
-                            that.getAccumulatorsSerialized())
+                            thisExecutionGraph.getCheckpointCoordinatorConfiguration(),
+                            thatExecutionGraph.getCheckpointCoordinatorConfiguration())
+                    && thisExecutionGraph.getAllVertices().size()
+                            == thatExecutionGraph.getAllVertices().size()
                     && Objects.equals(
-                            archivedExecutionGraph.getCheckpointCoordinatorConfiguration(),
-                            that.getCheckpointCoordinatorConfiguration())
-                    && archivedExecutionGraph.getAllVertices().size()
-                            == that.getAllVertices().size();
+                            expectedExecutionGraphInfo.getExceptionHistory(),
+                            that.getExceptionHistory());
         }
 
         @Override
         public void describeTo(Description description) {
             description.appendText(
-                    "Matches against " + ArchivedExecutionGraph.class.getSimpleName() + '.');
+                    "Matches against " + ExecutionGraphInfo.class.getSimpleName() + '.');
         }
     }
 
-    private static Matcher<ArchivedExecutionGraph> matchesPartiallyWith(
-            ArchivedExecutionGraph executionGraph) {
-        return new PartialArchivedExecutionGraphMatcher(executionGraph);
+    private static Matcher<ExecutionGraphInfo> matchesPartiallyWith(
+            ExecutionGraphInfo executionGraphInfo) {
+        return new PartialExecutionGraphInfoMatcher(executionGraphInfo);
     }
 
     private static Collection<JobDetails> generateJobDetails(
-            Collection<ArchivedExecutionGraph> executionGraphs) {
-        return executionGraphs.stream()
+            Collection<ExecutionGraphInfo> executionGraphInfos) {
+        return executionGraphInfos.stream()
+                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index e496863..8250d8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
@@ -38,6 +37,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.TestLogger;
 
@@ -74,7 +74,7 @@ public class MiniDispatcherTest extends TestLogger {
 
     private static JobGraph jobGraph;
 
-    private static ArchivedExecutionGraph archivedExecutionGraph;
+    private static ExecutionGraphInfo executionGraphInfo;
 
     private static TestingRpcService rpcService;
 
@@ -87,8 +87,8 @@ public class MiniDispatcherTest extends TestLogger {
 
     private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
 
-    private final ArchivedExecutionGraphStore archivedExecutionGraphStore =
-            new MemoryArchivedExecutionGraphStore();
+    private final ExecutionGraphInfoStore executionGraphInfoStore =
+            new MemoryExecutionGraphInfoStore();
 
     private TestingHighAvailabilityServices highAvailabilityServices;
 
@@ -98,11 +98,12 @@ public class MiniDispatcherTest extends TestLogger {
     public static void setupClass() throws IOException {
         jobGraph = new JobGraph();
 
-        archivedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobGraph.getJobID())
-                        .setState(JobStatus.FINISHED)
-                        .build();
+        executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobGraph.getJobID())
+                                .setState(JobStatus.FINISHED)
+                                .build());
 
         rpcService = new TestingRpcService();
         configuration = new Configuration();
@@ -166,7 +167,7 @@ public class MiniDispatcherTest extends TestLogger {
             final TestingJobManagerRunner testingJobManagerRunner =
                     testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
 
-            testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
 
             // wait until we terminate
             miniDispatcher.getShutDownFuture().get();
@@ -192,7 +193,7 @@ public class MiniDispatcherTest extends TestLogger {
             final TestingJobManagerRunner testingJobManagerRunner =
                     testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
 
-            testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
 
             assertFalse(miniDispatcher.getTerminationFuture().isDone());
 
@@ -228,10 +229,11 @@ public class MiniDispatcherTest extends TestLogger {
 
             dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L));
             testingJobManagerRunner.completeResultFuture(
-                    new ArchivedExecutionGraphBuilder()
-                            .setJobID(jobGraph.getJobID())
-                            .setState(JobStatus.CANCELED)
-                            .build());
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setJobID(jobGraph.getJobID())
+                                    .setState(JobStatus.CANCELED)
+                                    .build()));
 
             ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
             assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
@@ -256,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
                         () -> CompletableFuture.completedFuture(resourceManagerGateway),
                         blobServer,
                         heartbeatServices,
-                        archivedExecutionGraphStore,
+                        executionGraphInfoStore,
                         testingFatalErrorHandlerResource.getFatalErrorHandler(),
                         VoidHistoryServerArchivist.INSTANCE,
                         null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 0e53e0f..a774709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nonnull;
 
@@ -64,8 +64,8 @@ class TestingDispatcher extends Dispatcher {
         startFuture.complete(null);
     }
 
-    void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
-        runAsync(() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+    void completeJobExecution(ExecutionGraphInfo executionGraphInfo) {
+        runAsync(() -> jobReachedGloballyTerminalState(executionGraphInfo));
     }
 
     CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index cbd2280..bad26fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherServices;
 import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -116,7 +116,7 @@ public class DefaultDispatcherRunnerITCase extends TestLogger {
                         blobServerResource.getBlobServer(),
                         new TestingHeartbeatServices(),
                         UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
-                        new MemoryArchivedExecutionGraphStore(),
+                        new MemoryExecutionGraphInfoStore(),
                         fatalErrorHandler,
                         VoidHistoryServerArchivist.INSTANCE,
                         null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index d38c365..e84e746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
@@ -161,7 +161,7 @@ public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger {
                             blobServer,
                             new TestingHeartbeatServices(),
                             UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             fatalErrorHandler,
                             VoidHistoryServerArchivist.INSTANCE,
                             null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 9328271..12b5b97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class ClusterEntrypointTest {
         }
 
         @Override
-        protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+        protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
                 Configuration configuration, ScheduledExecutor scheduledExecutor)
                 throws IOException {
             throw new UnsupportedOperationException("Not needed for this test");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
index 275d799..0b2c9cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFacto
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -67,7 +67,7 @@ public class JobManagerRunnerImplTest extends TestLogger {
 
     private static JobGraph jobGraph;
 
-    private static ArchivedExecutionGraph archivedExecutionGraph;
+    private static ExecutionGraphInfo executionGraphInfo;
 
     private static JobMasterServiceFactory defaultJobMasterServiceFactory;
 
@@ -85,11 +85,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobGraph = new JobGraph(jobVertex);
 
-        archivedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobGraph.getJobID())
-                        .setState(JobStatus.FINISHED)
-                        .build();
+        executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobGraph.getJobID())
+                                .setState(JobStatus.FINISHED)
+                                .build());
     }
 
     @Before
@@ -120,12 +121,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
 
             assertThat(resultFuture.isDone(), is(false));
 
-            jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph);
+            jobManagerRunner.jobReachedGloballyTerminalState(executionGraphInfo);
 
             final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
             assertThat(
                     jobManagerRunnerResult,
-                    is(JobManagerRunnerResult.forSuccess(archivedExecutionGraph)));
+                    is(JobManagerRunnerResult.forSuccess(executionGraphInfo)));
         } finally {
             jobManagerRunner.close();
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
index ef170cb..d90d753 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -33,14 +33,14 @@ import static org.junit.Assert.assertTrue;
 /** Tests for the {@link JobManagerRunnerResult}. */
 public class JobManagerRunnerResultTest extends TestLogger {
 
-    private final ArchivedExecutionGraph archivedExecutionGraph =
-            new ArchivedExecutionGraphBuilder().build();
+    private final ExecutionGraphInfo executionGraphInfo =
+            new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
     private final FlinkException testException = new FlinkException("test exception");
 
     @Test
     public void testSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
         assertTrue(jobManagerRunnerResult.isSuccess());
         assertFalse(jobManagerRunnerResult.isJobNotFinished());
@@ -70,9 +70,9 @@ public class JobManagerRunnerResultTest extends TestLogger {
     @Test
     public void testGetArchivedExecutionGraphFromSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
-        assertThat(jobManagerRunnerResult.getArchivedExecutionGraph(), is(archivedExecutionGraph));
+        assertThat(jobManagerRunnerResult.getExecutionGraphInfo(), is(executionGraphInfo));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -80,7 +80,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
         final JobManagerRunnerResult jobManagerRunnerResult =
                 JobManagerRunnerResult.forJobNotFinished();
 
-        jobManagerRunnerResult.getArchivedExecutionGraph();
+        jobManagerRunnerResult.getExecutionGraphInfo();
     }
 
     @Test(expected = IllegalStateException.class)
@@ -88,7 +88,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
         final JobManagerRunnerResult jobManagerRunnerResult =
                 JobManagerRunnerResult.forInitializationFailure(testException);
 
-        jobManagerRunnerResult.getArchivedExecutionGraph();
+        jobManagerRunnerResult.getExecutionGraphInfo();
     }
 
     @Test
@@ -110,7 +110,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
     @Test(expected = IllegalStateException.class)
     public void testGetInitializationFailureFromSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
         jobManagerRunnerResult.getInitializationFailure();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
index 6346172..8b57cfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
@@ -133,7 +133,11 @@ public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
         assertThat(deploymentTrackerWrapper.getStopFuture().get(), is(deployedExecution));
 
         assertThat(
-                onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getState(),
+                onCompletionActions
+                        .getJobReachedGloballyTerminalStateFuture()
+                        .get()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.FAILED));
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index bf78cea..ca04421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -105,6 +105,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
@@ -1199,7 +1200,7 @@ public class JobMasterTest extends TestLogger {
     private static Collection<AccessExecution> getExecutions(
             final JobMasterGateway jobMasterGateway) {
         final ArchivedExecutionGraph archivedExecutionGraph =
-                requestExecutionGraph(jobMasterGateway);
+                requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
 
         return archivedExecutionGraph.getAllVertices().values().stream()
                 .flatMap(vertex -> Arrays.stream(vertex.getTaskVertices()))
@@ -1210,7 +1211,7 @@ public class JobMasterTest extends TestLogger {
     private static List<AccessExecution> getExecutions(
             final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) {
         final ArchivedExecutionGraph archivedExecutionGraph =
-                requestExecutionGraph(jobMasterGateway);
+                requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
 
         return Optional.ofNullable(archivedExecutionGraph.getAllVertices().get(jobVertexId))
                 .map(
@@ -1221,7 +1222,7 @@ public class JobMasterTest extends TestLogger {
                 .collect(Collectors.toList());
     }
 
-    private static ArchivedExecutionGraph requestExecutionGraph(
+    private static ExecutionGraphInfo requestExecutionGraph(
             final JobMasterGateway jobMasterGateway) {
         try {
             return jobMasterGateway.requestJob(testingTimeout).get();
@@ -1821,7 +1822,10 @@ public class JobMasterTest extends TestLogger {
             jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway);
 
             final ArchivedExecutionGraph archivedExecutionGraph =
-                    onCompletionActions.getJobReachedGloballyTerminalStateFuture().get();
+                    onCompletionActions
+                            .getJobReachedGloballyTerminalStateFuture()
+                            .get()
+                            .getArchivedExecutionGraph();
 
             assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
         } finally {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 4412d72..1d80e17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -88,8 +88,8 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         return closeAsyncCalledLatch;
     }
 
-    public void completeResultFuture(ArchivedExecutionGraph archivedExecutionGraph) {
-        resultFuture.complete(JobManagerRunnerResult.forSuccess(archivedExecutionGraph));
+    public void completeResultFuture(ExecutionGraphInfo executionGraphInfo) {
+        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
     }
 
     public void completeResultFutureExceptionally(Exception e) {
@@ -108,6 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         this.jobMasterGatewayFuture.complete(testingJobMasterGateway);
     }
 
+    /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */
     public static class Builder {
         private JobID jobId = null;
         private boolean blockingTermination = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 77de770..d8f2678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMet
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
@@ -207,15 +207,15 @@ public class JobMasterBuilder {
      */
     public static final class TestingOnCompletionActions implements OnCompletionActions {
 
-        private final CompletableFuture<ArchivedExecutionGraph>
-                jobReachedGloballyTerminalStateFuture = new CompletableFuture<>();
+        private final CompletableFuture<ExecutionGraphInfo> jobReachedGloballyTerminalStateFuture =
+                new CompletableFuture<>();
         private final CompletableFuture<Void> jobFinishedByOtherFuture = new CompletableFuture<>();
         private final CompletableFuture<Throwable> jobMasterFailedFuture =
                 new CompletableFuture<>();
 
         @Override
-        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
-            jobReachedGloballyTerminalStateFuture.complete(executionGraph);
+        public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
+            jobReachedGloballyTerminalStateFuture.complete(executionGraphInfo);
         }
 
         @Override
@@ -228,12 +228,11 @@ public class JobMasterBuilder {
             jobMasterFailedFuture.complete(cause);
         }
 
-        public CompletableFuture<ArchivedExecutionGraph>
-                getJobReachedGloballyTerminalStateFuture() {
+        public CompletableFuture<ExecutionGraphInfo> getJobReachedGloballyTerminalStateFuture() {
             return jobReachedGloballyTerminalStateFuture;
         }
 
-        public CompletableFuture<ArchivedExecutionGraph> getJobFinishedByOtherFuture() {
+        public CompletableFuture<ExecutionGraphInfo> getJobFinishedByOtherFuture() {
             return jobReachedGloballyTerminalStateFuture;
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index a6b4155..0464d51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -123,7 +123,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 
     @Nonnull private final Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier;
 
-    @Nonnull private final Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier;
+    @Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier;
 
     @Nonnull
     private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction;
@@ -221,7 +221,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
                             taskManagerHeartbeatConsumer,
             @Nonnull Consumer<ResourceID> resourceManagerHeartbeatConsumer,
             @Nonnull Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier,
-            @Nonnull Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier,
+            @Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier,
             @Nonnull
                     BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction,
             @Nonnull
@@ -388,7 +388,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         return requestJobSupplier.get();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index c2dc9ab..e957311 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -112,7 +112,7 @@ public class TestingJobMasterGatewayBuilder {
     private Consumer<ResourceID> resourceManagerHeartbeatConsumer = ignored -> {};
     private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier =
             () -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
-    private Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier =
+    private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier =
             () -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
     private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction =
             (targetDirectory, ignoredB) ->
@@ -264,7 +264,7 @@ public class TestingJobMasterGatewayBuilder {
     }
 
     public TestingJobMasterGatewayBuilder setRequestJobSupplier(
-            Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier) {
+            Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier) {
         this.requestJobSupplier = requestJobSupplier;
         return this;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index 49d3772..d0767a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.minicluster;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -105,7 +105,7 @@ public class TestingMiniCluster extends MiniCluster {
                             blobServer,
                             heartbeatServices,
                             metricRegistry,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             metricQueryServiceRetriever,
                             fatalErrorHandler));
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
index ace82f9..269041a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
@@ -55,12 +54,13 @@ import static org.junit.Assert.fail;
 /** Tests for the {@link DefaultExecutionGraphCache}. */
 public class DefaultExecutionGraphCacheTest extends TestLogger {
 
-    private static ArchivedExecutionGraph expectedExecutionGraph;
+    private static ExecutionGraphInfo expectedExecutionGraphInfo;
     private static final JobID expectedJobId = new JobID();
 
     @BeforeClass
     public static void setup() {
-        expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build();
+        expectedExecutionGraphInfo =
+                new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
     }
 
     /** Tests that we can cache AccessExecutionGraphs over multiple accesses. */
@@ -71,19 +71,20 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
-                        expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+                        expectedJobId,
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
-            accessExecutionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
         }
@@ -98,23 +99,23 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
                         expectedJobId,
-                        CompletableFuture.completedFuture(expectedExecutionGraph),
-                        CompletableFuture.completedFuture(expectedExecutionGraph));
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo),
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
             // sleep for the TTL
             Thread.sleep(timeToLive.toMilliseconds() * 5L);
 
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture2 =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture2.get());
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2));
         }
@@ -135,25 +136,26 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
                         expectedJobId,
                         FutureUtils.completedExceptionally(
                                 new FlinkJobNotFoundException(expectedJobId)),
-                        CompletableFuture.completedFuture(expectedExecutionGraph));
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
             try {
                 executionGraphFuture.get();
 
                 fail("The execution graph future should have been completed exceptionally.");
             } catch (ExecutionException ee) {
+                ee.printStackTrace();
                 assertTrue(ee.getCause() instanceof FlinkException);
             }
 
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphFuture2 =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphFuture2.get());
         }
     }
 
@@ -166,21 +168,21 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         final Time timeout = Time.milliseconds(100L);
         final Time timeToLive = Time.milliseconds(1L);
         final JobID expectedJobId2 = new JobID();
-        final ArchivedExecutionGraph expectedExecutionGraph2 =
-                new ArchivedExecutionGraphBuilder().build();
+        final ExecutionGraphInfo expectedExecutionGraphInfo2 =
+                new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
 
         final AtomicInteger requestJobCalls = new AtomicInteger(0);
         final TestingRestfulGateway restfulGateway =
                 new TestingRestfulGateway.Builder()
-                        .setRequestJobFunction(
+                        .setRequestExecutionGraphInfoFunction(
                                 jobId -> {
                                     requestJobCalls.incrementAndGet();
                                     if (jobId.equals(expectedJobId)) {
                                         return CompletableFuture.completedFuture(
-                                                expectedExecutionGraph);
+                                                expectedExecutionGraphInfo);
                                     } else if (jobId.equals(expectedJobId2)) {
                                         return CompletableFuture.completedFuture(
-                                                expectedExecutionGraph2);
+                                                expectedExecutionGraphInfo2);
                                     } else {
                                         throw new AssertionError("Invalid job id received.");
                                     }
@@ -190,15 +192,15 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
 
-            CompletableFuture<AccessExecutionGraph> executionGraph1Future =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraph1Future =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            CompletableFuture<AccessExecutionGraph> executionGraph2Future =
-                    executionGraphCache.getExecutionGraph(expectedJobId2, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraph2Future =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId2, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraph1Future.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraph1Future.get());
 
-            assertEquals(expectedExecutionGraph2, executionGraph2Future.get());
+            assertEquals(expectedExecutionGraphInfo2, executionGraph2Future.get());
 
             assertThat(requestJobCalls.get(), Matchers.equalTo(2));
 
@@ -218,11 +220,12 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
-                        expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+                        expectedJobId,
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         final int numConcurrentAccesses = 10;
 
-        final ArrayList<CompletableFuture<AccessExecutionGraph>> executionGraphFutures =
+        final ArrayList<CompletableFuture<ExecutionGraphInfo>> executionGraphFutures =
                 new ArrayList<>(numConcurrentAccesses);
 
         final ExecutorService executor =
@@ -231,10 +234,10 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
             for (int i = 0; i < numConcurrentAccesses; i++) {
-                CompletableFuture<AccessExecutionGraph> executionGraphFuture =
+                CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
                         CompletableFuture.supplyAsync(
                                         () ->
-                                                executionGraphCache.getExecutionGraph(
+                                                executionGraphCache.getExecutionGraphInfo(
                                                         expectedJobId, restfulGateway),
                                         executor)
                                 .thenCompose(Function.identity());
@@ -242,13 +245,13 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
                 executionGraphFutures.add(executionGraphFuture);
             }
 
-            final CompletableFuture<Collection<AccessExecutionGraph>> allExecutionGraphFutures =
+            final CompletableFuture<Collection<ExecutionGraphInfo>> allExecutionGraphFutures =
                     FutureUtils.combineAll(executionGraphFutures);
 
-            Collection<AccessExecutionGraph> allExecutionGraphs = allExecutionGraphFutures.get();
+            Collection<ExecutionGraphInfo> allExecutionGraphs = allExecutionGraphFutures.get();
 
-            for (AccessExecutionGraph executionGraph : allExecutionGraphs) {
-                assertEquals(expectedExecutionGraph, executionGraph);
+            for (ExecutionGraphInfo executionGraph : allExecutionGraphs) {
+                assertEquals(expectedExecutionGraphInfo, executionGraph);
             }
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
@@ -258,8 +261,8 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
     }
 
     private CountingRestfulGateway createCountingRestfulGateway(
-            JobID jobId, CompletableFuture<ArchivedExecutionGraph>... accessExecutionGraphs) {
-        final ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue =
+            JobID jobId, CompletableFuture<ExecutionGraphInfo>... accessExecutionGraphs) {
+        final ConcurrentLinkedQueue<CompletableFuture<ExecutionGraphInfo>> queue =
                 new ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
         return new CountingRestfulGateway(jobId, ignored -> queue.poll());
     }
@@ -276,16 +279,17 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         private CountingRestfulGateway(
                 JobID expectedJobId,
-                Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+                Function<JobID, CompletableFuture<ExecutionGraphInfo>> requestJobFunction) {
             this.expectedJobId = Preconditions.checkNotNull(expectedJobId);
-            this.requestJobFunction = Preconditions.checkNotNull(requestJobFunction);
+            this.requestExecutionGraphInfoFunction = Preconditions.checkNotNull(requestJobFunction);
         }
 
         @Override
-        public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
+        public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+                JobID jobId, Time timeout) {
             assertThat(jobId, Matchers.equalTo(expectedJobId));
             numRequestJobCalls.incrementAndGet();
-            return super.requestJob(jobId, timeout);
+            return super.requestExecutionGraphInfo(jobId, timeout);
         }
 
         public int getNumRequestJobCalls() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
index 0985a1c..12565f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.rest.util;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.util.concurrent.CompletableFuture;
@@ -36,7 +36,7 @@ public enum NoOpExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return FutureUtils.completedExceptionally(
                 new UnsupportedOperationException(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 737c69b..1e33ae0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -260,7 +260,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex archivedExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -294,7 +298,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -325,6 +333,7 @@ public class DefaultSchedulerTest extends TestLogger {
         Throwable failureCause =
                 scheduler
                         .requestJob()
+                        .getArchivedExecutionGraph()
                         .getFailureInfo()
                         .getException()
                         .deserializeError(DefaultSchedulerTest.class.getClassLoader());
@@ -388,7 +397,11 @@ public class DefaultSchedulerTest extends TestLogger {
         actionsToTriggerTaskFailure.accept(vid11);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ArchivedExecutionVertex ev11 = vertexIterator.next();
         final ArchivedExecutionVertex ev12 = vertexIterator.next();
         final ArchivedExecutionVertex ev21 = vertexIterator.next();
@@ -424,7 +437,12 @@ public class DefaultSchedulerTest extends TestLogger {
         testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
 
         final ArchivedExecutionVertex sourceExecutionVertex =
-                scheduler.requestJob().getAllExecutionVertices().iterator().next();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator()
+                        .next();
         final ExecutionAttemptID attemptId =
                 sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -437,7 +455,9 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(
                 testExecutionVertexOperations.getDeployedVertices(),
                 containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId));
-        assertThat(scheduler.requestJob().getState(), is(equalTo(JobStatus.RUNNING)));
+        assertThat(
+                scheduler.requestJob().getArchivedExecutionGraph().getState(),
+                is(equalTo(JobStatus.RUNNING)));
     }
 
     @Test
@@ -478,7 +498,11 @@ public class DefaultSchedulerTest extends TestLogger {
         schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -532,7 +556,11 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.handleGlobalFailure(new Exception("forced failure"));
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -554,7 +582,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ArchivedExecutionVertex v1 = vertexIterator.next();
         final ArchivedExecutionVertex v2 = vertexIterator.next();
 
@@ -588,7 +620,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -616,7 +652,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -652,7 +692,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -743,7 +787,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ExecutionAttemptID attemptId1 =
                 vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
         final ExecutionAttemptID attemptId2 =
@@ -781,7 +829,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final SchedulingTopology topology = scheduler.getSchedulingTopology();
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ExecutionAttemptID attemptId1 =
                 vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
         final ExecutionAttemptID attemptId2 =
@@ -818,7 +870,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -830,7 +886,8 @@ public class DefaultSchedulerTest extends TestLogger {
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        final ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
+        final ErrorInfo failureInfo =
+                scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
         assertThat(failureInfo, is(notNullValue()));
         assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage));
     }
@@ -850,7 +907,11 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.startScheduling();
 
         Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         ArchivedExecutionVertex v1 = vertexIterator.next();
 
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2));
@@ -863,7 +924,12 @@ public class DefaultSchedulerTest extends TestLogger {
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+        vertexIterator =
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         v1 = vertexIterator.next();
         ArchivedExecutionVertex v2 = vertexIterator.next();
         assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
@@ -877,7 +943,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ExecutionAttemptID attemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
 
@@ -917,7 +987,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         // initiate restartable failure
         final ExecutionAttemptID restartableAttemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
         final RuntimeException restartableException = new RuntimeException("restartable exception");
@@ -930,7 +1004,11 @@ public class DefaultSchedulerTest extends TestLogger {
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
         final ExecutionAttemptID failingAttemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
         final RuntimeException failingException = new RuntimeException("failing exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 9ff306d..ad3e98f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -190,7 +190,12 @@ public class SchedulerTestingUtils {
     public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(
             DefaultScheduler scheduler) {
         return StreamSupport.stream(
-                        scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices()
+                                .spliterator(),
+                        false)
                 .map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
                 .collect(Collectors.toList());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 71a990f..b64af0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -120,7 +119,7 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
+    public ExecutionGraphInfo requestJob() {
         failOperation();
         return null;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index efe8bfc..eccf446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriFunction;
 
@@ -86,6 +87,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
             String hostname,
             Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+            Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                    requestExecutionGraphInfoFunction,
             Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -115,6 +118,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
                 hostname,
                 cancelJobFunction,
                 requestJobFunction,
+                requestExecutionGraphInfoFunction,
                 requestJobResultFunction,
                 requestJobStatusFunction,
                 requestMultipleJobDetailsSupplier,
@@ -228,6 +232,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
                     hostname,
                     cancelJobFunction,
                     requestJobFunction,
+                    requestExecutionGraphInfoFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
index bd3e06a..69c6d30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
@@ -31,7 +31,7 @@ import java.util.function.IntSupplier;
 public class TestingExecutionGraphCache implements ExecutionGraphCache {
     private final IntSupplier sizeSupplier;
 
-    private final BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+    private final BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
             getExecutionGraphFunction;
 
     private final Runnable cleanupRunnable;
@@ -40,7 +40,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
 
     private TestingExecutionGraphCache(
             IntSupplier sizeSupplier,
-            BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+            BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                     getExecutionGraphFunction,
             Runnable cleanupRunnable,
             Runnable closeRunnable) {
@@ -56,7 +56,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return getExecutionGraphFunction.apply(jobId, restfulGateway);
     }
@@ -79,7 +79,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
     public static final class Builder {
 
         private IntSupplier sizeSupplier = () -> 0;
-        private BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+        private BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                 getExecutionGraphFunction =
                         (ignoredA, ignoredB) ->
                                 FutureUtils.completedExceptionally(
@@ -95,7 +95,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
         }
 
         public Builder setGetExecutionGraphFunction(
-                BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+                BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                         getExecutionGraphFunction) {
             this.getExecutionGraphFunction = getExecutionGraphFunction;
             return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 3d10329..f36af49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriFunction;
 
@@ -54,6 +55,10 @@ public class TestingRestfulGateway implements RestfulGateway {
             DEFAULT_REQUEST_JOB_FUNCTION =
                     jobId ->
                             FutureUtils.completedExceptionally(new UnsupportedOperationException());
+    static final Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+            DEFAULT_REQUEST_EXECUTION_GRAPH_INFO =
+                    jobId ->
+                            FutureUtils.completedExceptionally(new UnsupportedOperationException());
     static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION =
             jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
     static final Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -106,6 +111,9 @@ public class TestingRestfulGateway implements RestfulGateway {
 
     protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
 
+    protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+            requestExecutionGraphInfoFunction;
+
     protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
 
     protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
@@ -137,6 +145,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                 LOCALHOST,
                 DEFAULT_CANCEL_JOB_FUNCTION,
                 DEFAULT_REQUEST_JOB_FUNCTION,
+                DEFAULT_REQUEST_EXECUTION_GRAPH_INFO,
                 DEFAULT_REQUEST_JOB_RESULT_FUNCTION,
                 DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
                 DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
@@ -154,6 +163,8 @@ public class TestingRestfulGateway implements RestfulGateway {
             String hostname,
             Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+            Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                    requestExecutionGraphInfoFunction,
             Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -175,6 +186,7 @@ public class TestingRestfulGateway implements RestfulGateway {
         this.hostname = hostname;
         this.cancelJobFunction = cancelJobFunction;
         this.requestJobFunction = requestJobFunction;
+        this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
         this.requestJobResultFunction = requestJobResultFunction;
         this.requestJobStatusFunction = requestJobStatusFunction;
         this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier;
@@ -206,6 +218,12 @@ public class TestingRestfulGateway implements RestfulGateway {
     }
 
     @Override
+    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, Time timeout) {
+        return requestExecutionGraphInfoFunction.apply(jobId);
+    }
+
+    @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
         return requestJobResultFunction.apply(jobId);
     }
@@ -278,6 +296,8 @@ public class TestingRestfulGateway implements RestfulGateway {
         protected String hostname = LOCALHOST;
         protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
         protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
+        protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                requestExecutionGraphInfoFunction;
         protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
         protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
         protected Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -301,6 +321,7 @@ public class TestingRestfulGateway implements RestfulGateway {
         protected AbstractBuilder() {
             cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
             requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
+            requestExecutionGraphInfoFunction = DEFAULT_REQUEST_EXECUTION_GRAPH_INFO;
             requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
             requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
             requestMultipleJobDetailsSupplier = DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
@@ -338,6 +359,13 @@ public class TestingRestfulGateway implements RestfulGateway {
             return self();
         }
 
+        public T setRequestExecutionGraphInfoFunction(
+                Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                        requestExecutionGraphInfoFunction) {
+            this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
+            return self();
+        }
+
         public T setRequestJobResultFunction(
                 Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction) {
             this.requestJobResultFunction = requestJobResultFunction;
@@ -429,6 +457,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                     hostname,
                     cancelJobFunction,
                     requestJobFunction,
+                    requestExecutionGraphInfoFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 942987e..6a8487e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -159,7 +159,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
                             blobServerResource.getBlobServer(),
                             new HeartbeatServices(100L, 1000L),
                             NoOpMetricRegistry.INSTANCE,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             VoidMetricQueryServiceRetriever.INSTANCE,
                             fatalErrorHandler);