You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/23 16:29:18 UTC

[flink] branch master updated (19ceee0 -> b7e93fd)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 19ceee0  [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
     new deddfa2  [FLINK-11678][runtime] Removes unnecessary obsolete method overwriting
     new 278daba  [hotfix][test] Removes unused inner class
     new b7e93fd  [FLINK-21188][runtime] Introduces ExecutionGraphInfo

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../application/ApplicationClusterEntryPoint.java  |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java       |  58 ++---
 .../runtime/dispatcher/DispatcherGateway.java      |  17 --
 .../flink/runtime/dispatcher/DispatcherJob.java    |  36 +--
 .../runtime/dispatcher/DispatcherJobResult.java    |  25 ++-
 .../runtime/dispatcher/DispatcherServices.java     |  10 +-
 ...raphStore.java => ExecutionGraphInfoStore.java} |  20 +-
 ...Store.java => FileExecutionGraphInfoStore.java} |  77 +++----
 .../runtime/dispatcher/HistoryServerArchivist.java |  10 +-
 .../JsonResponseHistoryServerArchivist.java        |   8 +-
 ...ore.java => MemoryExecutionGraphInfoStore.java} |  38 ++--
 .../flink/runtime/dispatcher/MiniDispatcher.java   |   7 +-
 .../dispatcher/PartialDispatcherServices.java      |  10 +-
 ...PartialDispatcherServicesWithJobGraphStore.java |   4 +-
 .../dispatcher/VoidHistoryServerArchivist.java     |   5 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  14 +-
 .../runtime/entrypoint/JobClusterEntrypoint.java   |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java       |   8 +-
 ...tDispatcherResourceManagerComponentFactory.java |   6 +-
 .../DispatcherResourceManagerComponentFactory.java |   4 +-
 .../runtime/jobmanager/OnCompletionActions.java    |   6 +-
 .../runtime/jobmaster/JobManagerRunnerImpl.java    |   8 +-
 .../runtime/jobmaster/JobManagerRunnerResult.java  |  28 +--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  10 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   8 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   4 +-
 ...va => AbstractAccessExecutionGraphHandler.java} |  49 ++---
 .../handler/job/AbstractExecutionGraphHandler.java |  17 +-
 .../rest/handler/job/AbstractJobVertexHandler.java |   2 +-
 .../rest/handler/job/JobAccumulatorsHandler.java   |   3 +-
 .../runtime/rest/handler/job/JobConfigHandler.java |   2 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +-
 .../rest/handler/job/JobExceptionsHandler.java     |   3 +-
 .../runtime/rest/handler/job/JobPlanHandler.java   |   3 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   3 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   3 +-
 .../job/checkpoints/AbstractCheckpointHandler.java |   4 +-
 .../job/checkpoints/CheckpointConfigHandler.java   |   4 +-
 .../CheckpointingStatisticsHandler.java            |   4 +-
 .../handler/legacy/DefaultExecutionGraphCache.java |  40 ++--
 .../rest/handler/legacy/ExecutionGraphCache.java   |  21 +-
 .../runtime/scheduler/ExecutionGraphInfo.java      |  62 ++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |   5 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |   3 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   6 +-
 .../flink/runtime/webmonitor/RestfulGateway.java   |  20 +-
 .../runtime/webmonitor/history/JsonArchivist.java  |  20 ++
 .../runtime/dispatcher/DispatcherJobTest.java      |  67 ++++--
 .../dispatcher/DispatcherResourceCleanupTest.java  |  36 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |  42 ++--
 ...t.java => FileExecutionGraphInfoStoreTest.java} | 244 ++++++++++++---------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  34 +--
 .../runtime/dispatcher/TestingDispatcher.java      |   6 +-
 .../runner/DefaultDispatcherRunnerITCase.java      |   4 +-
 .../ZooKeeperDefaultDispatcherRunnerTest.java      |   4 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  |   4 +-
 .../jobmaster/JobManagerRunnerImplTest.java        |  19 +-
 .../jobmaster/JobManagerRunnerResultTest.java      |  18 +-
 ...asterExecutionDeploymentReconciliationTest.java |   6 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  12 +-
 .../runtime/jobmaster/TestingJobManagerRunner.java |   7 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |  15 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   8 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |   6 +-
 .../runtime/minicluster/TestingMiniCluster.java    |   4 +-
 .../legacy/DefaultExecutionGraphCacheTest.java     | 148 +++++--------
 .../runtime/rest/util/NoOpExecutionGraphCache.java |   4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 118 ++++++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |   7 +-
 .../runtime/scheduler/TestingSchedulerNG.java      |   3 +-
 .../webmonitor/TestingDispatcherGateway.java       |   5 +
 .../webmonitor/TestingExecutionGraphCache.java     |  12 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 74 files changed, 915 insertions(+), 664 deletions(-)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/{ArchivedExecutionGraphStore.java => ExecutionGraphInfoStore.java} (76%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/{FileArchivedExecutionGraphStore.java => FileExecutionGraphInfoStore.java} (78%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/{MemoryArchivedExecutionGraphStore.java => MemoryExecutionGraphInfoStore.java} (60%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/{JobVertexAccumulatorsHandler.java => AbstractAccessExecutionGraphHandler.java} (57%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
 rename flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/{FileArchivedExecutionGraphStoreTest.java => FileExecutionGraphInfoStoreTest.java} (50%)


[flink] 03/03: [FLINK-21188][runtime] Introduces ExecutionGraphInfo

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7e93fd671a0dc0e7ba919a447ca9e51bb451fdd
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Jan 27 15:47:33 2021 +0100

    [FLINK-21188][runtime] Introduces ExecutionGraphInfo
    
    This closes #14804.
---
 .../application/ApplicationClusterEntryPoint.java  |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java       |  58 ++---
 .../flink/runtime/dispatcher/DispatcherJob.java    |  36 +--
 .../runtime/dispatcher/DispatcherJobResult.java    |  25 ++-
 .../runtime/dispatcher/DispatcherServices.java     |  10 +-
 ...raphStore.java => ExecutionGraphInfoStore.java} |  20 +-
 ...Store.java => FileExecutionGraphInfoStore.java} |  77 +++----
 .../runtime/dispatcher/HistoryServerArchivist.java |  10 +-
 .../JsonResponseHistoryServerArchivist.java        |   8 +-
 ...ore.java => MemoryExecutionGraphInfoStore.java} |  38 ++--
 .../flink/runtime/dispatcher/MiniDispatcher.java   |   7 +-
 .../dispatcher/PartialDispatcherServices.java      |  10 +-
 ...PartialDispatcherServicesWithJobGraphStore.java |   4 +-
 .../dispatcher/VoidHistoryServerArchivist.java     |   5 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  14 +-
 .../runtime/entrypoint/JobClusterEntrypoint.java   |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java       |   8 +-
 ...tDispatcherResourceManagerComponentFactory.java |   6 +-
 .../DispatcherResourceManagerComponentFactory.java |   4 +-
 .../runtime/jobmanager/OnCompletionActions.java    |   6 +-
 .../runtime/jobmaster/JobManagerRunnerImpl.java    |   8 +-
 .../runtime/jobmaster/JobManagerRunnerResult.java  |  28 +--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  10 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   8 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   4 +-
 ...va => AbstractAccessExecutionGraphHandler.java} |  63 +++---
 .../handler/job/AbstractExecutionGraphHandler.java |  17 +-
 .../rest/handler/job/AbstractJobVertexHandler.java |   2 +-
 .../rest/handler/job/JobAccumulatorsHandler.java   |   3 +-
 .../runtime/rest/handler/job/JobConfigHandler.java |   2 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +-
 .../rest/handler/job/JobExceptionsHandler.java     |   3 +-
 .../runtime/rest/handler/job/JobPlanHandler.java   |   3 +-
 .../rest/handler/job/JobVertexDetailsHandler.java  |   3 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   3 +-
 .../job/checkpoints/AbstractCheckpointHandler.java |   4 +-
 .../job/checkpoints/CheckpointConfigHandler.java   |   4 +-
 .../CheckpointingStatisticsHandler.java            |   4 +-
 .../handler/legacy/DefaultExecutionGraphCache.java |  40 ++--
 .../rest/handler/legacy/ExecutionGraphCache.java   |  21 +-
 .../runtime/scheduler/ExecutionGraphInfo.java      |  62 ++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |   5 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |   3 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   6 +-
 .../flink/runtime/webmonitor/RestfulGateway.java   |  20 +-
 .../runtime/webmonitor/history/JsonArchivist.java  |  20 ++
 .../runtime/dispatcher/DispatcherJobTest.java      |  67 ++++--
 .../dispatcher/DispatcherResourceCleanupTest.java  |  36 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |  42 ++--
 ...t.java => FileExecutionGraphInfoStoreTest.java} | 244 ++++++++++++---------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  34 +--
 .../runtime/dispatcher/TestingDispatcher.java      |   6 +-
 .../runner/DefaultDispatcherRunnerITCase.java      |   4 +-
 .../ZooKeeperDefaultDispatcherRunnerTest.java      |   4 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  |   4 +-
 .../jobmaster/JobManagerRunnerImplTest.java        |  19 +-
 .../jobmaster/JobManagerRunnerResultTest.java      |  18 +-
 ...asterExecutionDeploymentReconciliationTest.java |   6 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  12 +-
 .../runtime/jobmaster/TestingJobManagerRunner.java |   7 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |  15 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   8 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |   6 +-
 .../runtime/minicluster/TestingMiniCluster.java    |   4 +-
 .../legacy/DefaultExecutionGraphCacheTest.java     | 104 ++++-----
 .../runtime/rest/util/NoOpExecutionGraphCache.java |   4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 118 ++++++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |   7 +-
 .../runtime/scheduler/TestingSchedulerNG.java      |   3 +-
 .../webmonitor/TestingDispatcherGateway.java       |   5 +
 .../webmonitor/TestingExecutionGraphCache.java     |  12 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 73 files changed, 921 insertions(+), 611 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
index 30e872d..e0b2128 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -78,9 +78,9 @@ public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             final Configuration configuration, final ScheduledExecutor scheduledExecutor) {
-        return new MemoryArchivedExecutionGraphStore();
+        return new MemoryExecutionGraphInfoStore();
     }
 
     protected static void configureExecution(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1a4befe..63ea5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -121,7 +122,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
     private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
 
-    private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     private final JobManagerRunnerFactory jobManagerRunnerFactory;
 
@@ -177,7 +178,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
 
-        this.archivedExecutionGraphStore = dispatcherServices.getArchivedExecutionGraphStore();
+        this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
 
         this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
 
@@ -441,7 +442,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 && executionType == ExecutionType.RECOVERY) {
             return dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
         } else {
-            return jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
+            return jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo());
         }
     }
 
@@ -557,8 +558,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
         CompletableFuture<Collection<JobStatus>> allJobsFuture =
                 allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
 
-        final JobsOverview completedJobsOverview =
-                archivedExecutionGraphStore.getStoredJobsOverview();
+        final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview();
 
         return allJobsFuture.thenCombine(
                 taskManagerOverviewFuture,
@@ -581,7 +581,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
 
         final Collection<JobDetails> completedJobDetails =
-                archivedExecutionGraphStore.getAvailableJobDetails();
+                executionGraphInfoStore.getAvailableJobDetails();
 
         return combinedJobDetails.thenApply(
                 (Collection<JobDetails> runningJobDetails) -> {
@@ -603,7 +603,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                         () -> {
                             // is it a completed job?
                             final JobDetails jobDetails =
-                                    archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+                                    executionGraphInfoStore.getAvailableJobDetails(jobId);
                             if (jobDetails == null) {
                                 return FutureUtils.completedExceptionally(
                                         new FlinkJobNotFoundException(jobId));
@@ -614,17 +614,18 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
-        Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException =
+    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, Time timeout) {
+        Function<Throwable, ExecutionGraphInfo> checkExecutionGraphStoreOnException =
                 throwable -> {
                     // check whether it is a completed job
-                    final ArchivedExecutionGraph archivedExecutionGraph =
-                            archivedExecutionGraphStore.get(jobId);
-                    if (archivedExecutionGraph == null) {
+                    final ExecutionGraphInfo executionGraphInfo =
+                            executionGraphInfoStore.get(jobId);
+                    if (executionGraphInfo == null) {
                         throw new CompletionException(
                                 ExceptionUtils.stripCompletionException(throwable));
                     } else {
-                        return archivedExecutionGraph;
+                        return executionGraphInfo;
                     }
                 };
         Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
@@ -638,21 +639,22 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
         DispatcherJob job = runningJobs.get(jobId);
 
         if (job == null) {
-            final ArchivedExecutionGraph archivedExecutionGraph =
-                    archivedExecutionGraphStore.get(jobId);
+            final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
 
-            if (archivedExecutionGraph == null) {
+            if (executionGraphInfo == null) {
                 return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
             } else {
                 return CompletableFuture.completedFuture(
-                        JobResult.createFrom(archivedExecutionGraph));
+                        JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
             }
         } else {
             return job.getResultFuture()
                     .thenApply(
                             dispatcherJobResult ->
                                     JobResult.createFrom(
-                                            dispatcherJobResult.getArchivedExecutionGraph()));
+                                            dispatcherJobResult
+                                                    .getExecutionGraphInfo()
+                                                    .getArchivedExecutionGraph()));
         }
     }
 
@@ -827,7 +829,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     protected CleanupJobState jobReachedGloballyTerminalState(
-            ArchivedExecutionGraph archivedExecutionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
+        ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         Preconditions.checkArgument(
                 archivedExecutionGraph.getState().isGloballyTerminalState(),
                 "Job %s is in state %s which is not globally terminal.",
@@ -839,32 +843,32 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                 archivedExecutionGraph.getJobID(),
                 archivedExecutionGraph.getState());
 
-        archiveExecutionGraph(archivedExecutionGraph);
+        archiveExecutionGraph(executionGraphInfo);
 
         return CleanupJobState.GLOBAL;
     }
 
-    private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
+    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
         try {
-            archivedExecutionGraphStore.put(archivedExecutionGraph);
+            executionGraphInfoStore.put(executionGraphInfo);
         } catch (IOException e) {
             log.info(
                     "Could not store completed job {}({}).",
-                    archivedExecutionGraph.getJobName(),
-                    archivedExecutionGraph.getJobID(),
+                    executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                    executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
 
         final CompletableFuture<Acknowledge> executionGraphFuture =
-                historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
+                historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
 
         executionGraphFuture.whenComplete(
                 (Acknowledge ignored, Throwable throwable) -> {
                     if (throwable != null) {
                         log.info(
                                 "Could not archive completed job {}({}) to the history server.",
-                                archivedExecutionGraph.getJobName(),
-                                archivedExecutionGraph.getJobID(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                                 throwable);
                     }
                 });
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
index 9e253f2..7243303 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -137,8 +138,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
     private void handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult) {
         if (jobManagerRunnerResult.isSuccess()) {
             jobResultFuture.complete(
-                    DispatcherJobResult.forSuccess(
-                            jobManagerRunnerResult.getArchivedExecutionGraph()));
+                    DispatcherJobResult.forSuccess(jobManagerRunnerResult.getExecutionGraphInfo()));
         } else if (jobManagerRunnerResult.isJobNotFinished()) {
             jobResultFuture.completeExceptionally(new JobNotFinishedException(jobId));
         } else if (jobManagerRunnerResult.isInitializationFailure()) {
@@ -156,7 +156,7 @@ public final class DispatcherJob implements AutoCloseableAsync {
                         initializationTimestamp);
         jobResultFuture.complete(
                 DispatcherJobResult.forInitializationFailure(
-                        archivedExecutionGraph, initializationFailure));
+                        new ExecutionGraphInfo(archivedExecutionGraph), initializationFailure));
     }
 
     public CompletableFuture<DispatcherJobResult> getResultFuture() {
@@ -166,9 +166,10 @@ public final class DispatcherJob implements AutoCloseableAsync {
     public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
         return requestJob(timeout)
                 .thenApply(
-                        executionGraph -> {
+                        executionGraphInfo -> {
                             synchronized (lock) {
-                                return JobDetails.createDetailsForJob(executionGraph);
+                                return JobDetails.createDetailsForJob(
+                                        executionGraphInfo.getArchivedExecutionGraph());
                             }
                         });
     }
@@ -205,16 +206,18 @@ public final class DispatcherJob implements AutoCloseableAsync {
     }
 
     public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
-        return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+        return requestJob(timeout)
+                .thenApply(
+                        executionGraphInfo ->
+                                executionGraphInfo.getArchivedExecutionGraph().getState());
     }
 
-    /** Returns a future completing to the ArchivedExecutionGraph of the job. */
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    /** Returns a future completing to the ExecutionGraphInfo of the job. */
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         synchronized (lock) {
             if (isInitialized()) {
                 if (jobResultFuture.isDone()) { // job is not running anymore
-                    return jobResultFuture.thenApply(
-                            DispatcherJobResult::getArchivedExecutionGraph);
+                    return jobResultFuture.thenApply(DispatcherJobResult::getExecutionGraphInfo);
                 }
                 // job is still running
                 return getJobMasterGateway()
@@ -224,12 +227,13 @@ public final class DispatcherJob implements AutoCloseableAsync {
                         this.jobStatus == DispatcherJobStatus.INITIALIZING
                                 || jobStatus == DispatcherJobStatus.CANCELLING);
                 return CompletableFuture.completedFuture(
-                        ArchivedExecutionGraph.createFromInitializingJob(
-                                jobId,
-                                jobName,
-                                jobStatus.asJobStatus(),
-                                null,
-                                initializationTimestamp));
+                        new ExecutionGraphInfo(
+                                ArchivedExecutionGraph.createFromInitializingJob(
+                                        jobId,
+                                        jobName,
+                                        jobStatus.asJobStatus(),
+                                        null,
+                                        initializationTimestamp)));
             }
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
index ad4f5ed..a5d93da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
@@ -19,25 +19,26 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 /**
- * Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization
- * has failed. For initialization failures, the throwable is also attached, to avoid deserializing
- * it from the ArchivedExecutionGraph.
+ * Container for returning the {@link ExecutionGraphInfo} and a flag whether the initialization has
+ * failed. For initialization failures, the throwable is also attached, to avoid deserializing it
+ * from the {@link ArchivedExecutionGraph}.
  */
 final class DispatcherJobResult {
 
-    private final ArchivedExecutionGraph archivedExecutionGraph;
+    private final ExecutionGraphInfo executionGraphInfo;
 
     // if the throwable field is set, the job failed during initialization.
     @Nullable private final Throwable initializationFailure;
 
     private DispatcherJobResult(
-            ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
-        this.archivedExecutionGraph = archivedExecutionGraph;
+            ExecutionGraphInfo executionGraphInfo, @Nullable Throwable throwable) {
+        this.executionGraphInfo = executionGraphInfo;
         this.initializationFailure = throwable;
     }
 
@@ -45,8 +46,8 @@ final class DispatcherJobResult {
         return initializationFailure != null;
     }
 
-    public ArchivedExecutionGraph getArchivedExecutionGraph() {
-        return archivedExecutionGraph;
+    public ExecutionGraphInfo getExecutionGraphInfo() {
+        return executionGraphInfo;
     }
 
     /** @throws IllegalStateException if this DispatcherJobResult is a successful initialization. */
@@ -58,11 +59,11 @@ final class DispatcherJobResult {
     }
 
     public static DispatcherJobResult forInitializationFailure(
-            ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
-        return new DispatcherJobResult(archivedExecutionGraph, throwable);
+            ExecutionGraphInfo executionGraphInfo, Throwable throwable) {
+        return new DispatcherJobResult(executionGraphInfo, throwable);
     }
 
-    public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
-        return new DispatcherJobResult(archivedExecutionGraph, null);
+    public static DispatcherJobResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+        return new DispatcherJobResult(executionGraphInfo, null);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 79a837a..d4ecaf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -48,7 +48,7 @@ public class DispatcherServices {
 
     @Nonnull private final JobManagerMetricGroup jobManagerMetricGroup;
 
-    @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     @Nonnull private final FatalErrorHandler fatalErrorHandler;
 
@@ -68,7 +68,7 @@ public class DispatcherServices {
             @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -81,7 +81,7 @@ public class DispatcherServices {
         this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
         this.blobServer = blobServer;
         this.heartbeatServices = heartbeatServices;
-        this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+        this.executionGraphInfoStore = executionGraphInfoStore;
         this.fatalErrorHandler = fatalErrorHandler;
         this.historyServerArchivist = historyServerArchivist;
         this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -122,8 +122,8 @@ public class DispatcherServices {
     }
 
     @Nonnull
-    public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
-        return archivedExecutionGraphStore;
+    public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+        return executionGraphInfoStore;
     }
 
     @Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
similarity index 76%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
index d38d804..1d63567 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nullable;
 
@@ -29,32 +29,32 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 
-/** Interface for a {@link ArchivedExecutionGraph} store. */
-public interface ArchivedExecutionGraphStore extends Closeable {
+/** Interface for a {@link ExecutionGraphInfo} store. */
+public interface ExecutionGraphInfoStore extends Closeable {
 
     /**
-     * Returns the current number of stored {@link ArchivedExecutionGraph}.
+     * Returns the current number of stored {@link ExecutionGraphInfo} instances.
      *
-     * @return Current number of stored {@link ArchivedExecutionGraph}
+     * @return Current number of stored {@link ExecutionGraphInfo} instances
      */
     int size();
 
     /**
-     * Get the {@link ArchivedExecutionGraph} for the given job id. Null if it isn't stored.
+     * Get the {@link ExecutionGraphInfo} for the given job id. Null if it isn't stored.
      *
      * @param jobId identifying the serializable execution graph to retrieve
      * @return The stored serializable execution graph or null
      */
     @Nullable
-    ArchivedExecutionGraph get(JobID jobId);
+    ExecutionGraphInfo get(JobID jobId);
 
     /**
-     * Store the given {@link ArchivedExecutionGraph} in the store.
+     * Store the given {@link ExecutionGraphInfo} in the store.
      *
-     * @param archivedExecutionGraph to store
+     * @param executionGraphInfo to store
      * @throws IOException if the serializable execution graph could not be stored in the store
      */
-    void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException;
+    void put(ExecutionGraphInfo executionGraphInfo) throws IOException;
 
     /**
      * Return the {@link JobsOverview} for all stored/past jobs.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
similarity index 78%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
index f08c996..b08d426 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -55,20 +56,19 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk
- * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover,
- * the stored execution graphs are periodically cleaned up.
+ * Store for {@link ExecutionGraphInfo} instances. The store writes the archived execution graph
+ * information to disk and keeps the most recently used execution graphs in a memory cache for
+ * faster serving. Moreover, the stored execution graphs are periodically cleaned up.
  */
-public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
 
     private final File storageDir;
 
     private final Cache<JobID, JobDetails> jobDetailsCache;
 
-    private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
+    private final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache;
 
     private final ScheduledFuture<?> cleanupFuture;
 
@@ -80,7 +80,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
     private int numCanceledJobs;
 
-    public FileArchivedExecutionGraphStore(
+    public FileExecutionGraphInfoStore(
             File rootDir,
             Time expirationTime,
             int maximumCapacity,
@@ -93,7 +93,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
         LOG.info(
                 "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
-                FileArchivedExecutionGraphStore.class.getSimpleName(),
+                FileExecutionGraphInfoStore.class.getSimpleName(),
                 storageDirectory,
                 expirationTime.toMilliseconds(),
                 maximumCacheSizeBytes);
@@ -113,15 +113,14 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
                         .ticker(ticker)
                         .build();
 
-        this.archivedExecutionGraphCache =
+        this.executionGraphInfoCache =
                 CacheBuilder.newBuilder()
                         .maximumWeight(maximumCacheSizeBytes)
                         .weigher(this::calculateSize)
                         .build(
-                                new CacheLoader<JobID, ArchivedExecutionGraph>() {
+                                new CacheLoader<JobID, ExecutionGraphInfo>() {
                                     @Override
-                                    public ArchivedExecutionGraph load(JobID jobId)
-                                            throws Exception {
+                                    public ExecutionGraphInfo load(JobID jobId) throws Exception {
                                         return loadExecutionGraph(jobId);
                                     }
                                 });
@@ -147,19 +146,23 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 
     @Override
     @Nullable
-    public ArchivedExecutionGraph get(JobID jobId) {
+    public ExecutionGraphInfo get(JobID jobId) {
         try {
-            return archivedExecutionGraphCache.get(jobId);
+            return executionGraphInfoCache.get(jobId);
         } catch (ExecutionException e) {
-            LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
+            LOG.debug(
+                    "Could not load archived execution graph information for job id {}.", jobId, e);
             return null;
         }
     }
 
     @Override
-    public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
+    public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
+        final JobID jobId = executionGraphInfo.getJobId();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         final JobStatus jobStatus = archivedExecutionGraph.getState();
-        final JobID jobId = archivedExecutionGraph.getJobID();
         final String jobName = archivedExecutionGraph.getJobName();
 
         Preconditions.checkArgument(
@@ -195,12 +198,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
         }
 
         // write the ArchivedExecutionGraph to disk
-        storeArchivedExecutionGraph(archivedExecutionGraph);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);
 
         jobDetailsCache.put(jobId, detailsForJob);
-        archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
+        executionGraphInfoCache.put(jobId, executionGraphInfo);
     }
 
     @Override
@@ -236,27 +239,28 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
     // Internal methods
     // --------------------------------------------------------------
 
-    private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
-        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+    private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) {
+        final File executionGraphInfoFile = getExecutionGraphFile(jobId);
 
-        if (archivedExecutionGraphFile.exists()) {
-            return Math.toIntExact(archivedExecutionGraphFile.length());
+        if (executionGraphInfoFile.exists()) {
+            return Math.toIntExact(executionGraphInfoFile.length());
         } else {
             LOG.debug(
-                    "Could not find archived execution graph file for {}. Estimating the size instead.",
+                    "Could not find execution graph information file for {}. Estimating the size instead.",
                     jobId);
+            final ArchivedExecutionGraph serializableExecutionGraph =
+                    serializableExecutionGraphInfo.getArchivedExecutionGraph();
             return serializableExecutionGraph.getAllVertices().size() * 1000
                     + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
         }
     }
 
-    private ArchivedExecutionGraph loadExecutionGraph(JobID jobId)
+    private ExecutionGraphInfo loadExecutionGraph(JobID jobId)
             throws IOException, ClassNotFoundException {
-        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
+        final File executionGraphInfoFile = getExecutionGraphFile(jobId);
 
-        if (archivedExecutionGraphFile.exists()) {
-            try (FileInputStream fileInputStream =
-                    new FileInputStream(archivedExecutionGraphFile)) {
+        if (executionGraphInfoFile.exists()) {
+            try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile)) {
                 return InstantiationUtil.deserializeObject(
                         fileInputStream, getClass().getClassLoader());
             }
@@ -268,13 +272,12 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
         }
     }
 
-    private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph)
-            throws IOException {
+    private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
         final File archivedExecutionGraphFile =
-                getExecutionGraphFile(archivedExecutionGraph.getJobID());
+                getExecutionGraphFile(executionGraphInfo.getJobId());
 
         try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
-            InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
+            InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
         }
     }
 
@@ -293,7 +296,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
             LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
         }
 
-        archivedExecutionGraphCache.invalidate(jobId);
+        executionGraphInfoCache.invalidate(jobId);
         jobDetailsCache.invalidate(jobId);
     }
 
@@ -323,7 +326,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
     }
 
     @VisibleForTesting
-    LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
-        return archivedExecutionGraphCache;
+    LoadingCache<JobID, ExecutionGraphInfo> getExecutionGraphInfoCache() {
+        return executionGraphInfoCache;
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
index 2a571d2..f40af97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -21,23 +21,23 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-/** Writer for an {@link AccessExecutionGraph}. */
+/** Writer for an {@link ExecutionGraphInfo}. */
 public interface HistoryServerArchivist {
 
     /**
-     * Archives the given {@link AccessExecutionGraph} on the history server.
+     * Archives the given {@link ExecutionGraphInfo} on the history server.
      *
-     * @param executionGraph to store on the history server
+     * @param executionGraphInfo to store on the history server
      * @return Future which is completed once the archiving has been completed.
      */
-    CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+    CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo);
 
     static HistoryServerArchivist createHistoryServerArchivist(
             Configuration configuration, JsonArchivist jsonArchivist, Executor ioExecutor) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
index 991ff67..e68f3b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -50,14 +51,15 @@ class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
 
     @Override
     public CompletableFuture<Acknowledge> archiveExecutionGraph(
-            AccessExecutionGraph executionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
         return CompletableFuture.runAsync(
                         ThrowingRunnable.unchecked(
                                 () ->
                                         FsJobArchivist.archiveJob(
                                                 archivePath,
-                                                executionGraph.getJobID(),
-                                                jsonArchivist.archiveJsonWithPath(executionGraph))),
+                                                executionGraphInfo.getJobId(),
+                                                jsonArchivist.archiveJsonWithPath(
+                                                        executionGraphInfo))),
                         ioExecutor)
                 .thenApply(ignored -> Acknowledge.get());
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
similarity index 60%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
index 53df4a7..35b57da3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nullable;
 
@@ -33,34 +34,35 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * {@link ArchivedExecutionGraphStore} implementation which stores the {@link
- * ArchivedExecutionGraph} in memory.
+ * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in
+ * memory.
  */
-public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
+public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ArchivedExecutionGraph> serializableExecutionGraphs = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = new HashMap<>(4);
 
     @Override
     public int size() {
-        return serializableExecutionGraphs.size();
+        return serializableExecutionGraphInfos.size();
     }
 
     @Nullable
     @Override
-    public ArchivedExecutionGraph get(JobID jobId) {
-        return serializableExecutionGraphs.get(jobId);
+    public ExecutionGraphInfo get(JobID jobId) {
+        return serializableExecutionGraphInfos.get(jobId);
     }
 
     @Override
-    public void put(ArchivedExecutionGraph serializableExecutionGraph) throws IOException {
-        serializableExecutionGraphs.put(
-                serializableExecutionGraph.getJobID(), serializableExecutionGraph);
+    public void put(ExecutionGraphInfo serializableExecutionGraphInfo) throws IOException {
+        serializableExecutionGraphInfos.put(
+                serializableExecutionGraphInfo.getJobId(), serializableExecutionGraphInfo);
     }
 
     @Override
     public JobsOverview getStoredJobsOverview() {
         Collection<JobStatus> allJobStatus =
-                serializableExecutionGraphs.values().stream()
+                serializableExecutionGraphInfos.values().stream()
+                        .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                         .map(ArchivedExecutionGraph::getState)
                         .collect(Collectors.toList());
 
@@ -69,7 +71,8 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
 
     @Override
     public Collection<JobDetails> getAvailableJobDetails() {
-        return serializableExecutionGraphs.values().stream()
+        return serializableExecutionGraphInfos.values().stream()
+                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
@@ -77,11 +80,12 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
     @Nullable
     @Override
     public JobDetails getAvailableJobDetails(JobID jobId) {
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                serializableExecutionGraphs.get(jobId);
+        final ExecutionGraphInfo archivedExecutionGraphInfo =
+                serializableExecutionGraphInfos.get(jobId);
 
-        if (archivedExecutionGraph != null) {
-            return JobDetails.createDetailsForJob(archivedExecutionGraph);
+        if (archivedExecutionGraphInfo != null) {
+            return JobDetails.createDetailsForJob(
+                    archivedExecutionGraphInfo.getArchivedExecutionGraph());
         } else {
             return null;
         }
@@ -89,6 +93,6 @@ public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraph
 
     @Override
     public void close() throws IOException {
-        serializableExecutionGraphs.clear();
+        serializableExecutionGraphInfos.clear();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 553cc76..4b9b4b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FlinkException;
 
 import java.util.Collections;
@@ -119,9 +120,11 @@ public class MiniDispatcher extends Dispatcher {
 
     @Override
     protected CleanupJobState jobReachedGloballyTerminalState(
-            ArchivedExecutionGraph archivedExecutionGraph) {
+            ExecutionGraphInfo executionGraphInfo) {
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
         final CleanupJobState cleanupHAState =
-                super.jobReachedGloballyTerminalState(archivedExecutionGraph);
+                super.jobReachedGloballyTerminalState(executionGraphInfo);
 
         if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
             // shut down if job is cancelled or we don't have to wait for the execution result
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 81bd0d1..6fc8d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -49,7 +49,7 @@ public class PartialDispatcherServices {
 
     @Nonnull private final JobManagerMetricGroupFactory jobManagerMetricGroupFactory;
 
-    @Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
 
     @Nonnull private final FatalErrorHandler fatalErrorHandler;
 
@@ -66,7 +66,7 @@ public class PartialDispatcherServices {
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
             @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -77,7 +77,7 @@ public class PartialDispatcherServices {
         this.blobServer = blobServer;
         this.heartbeatServices = heartbeatServices;
         this.jobManagerMetricGroupFactory = jobManagerMetricGroupFactory;
-        this.archivedExecutionGraphStore = archivedExecutionGraphStore;
+        this.executionGraphInfoStore = executionGraphInfoStore;
         this.fatalErrorHandler = fatalErrorHandler;
         this.historyServerArchivist = historyServerArchivist;
         this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -115,8 +115,8 @@ public class PartialDispatcherServices {
     }
 
     @Nonnull
-    public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
-        return archivedExecutionGraphStore;
+    public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
+        return executionGraphInfoStore;
     }
 
     @Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
index ac69737..f4a4716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java
@@ -44,7 +44,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
             @Nonnull BlobServer blobServer,
             @Nonnull HeartbeatServices heartbeatServices,
             @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
-            @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
             @Nonnull FatalErrorHandler fatalErrorHandler,
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
@@ -57,7 +57,7 @@ public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatche
                 blobServer,
                 heartbeatServices,
                 jobManagerMetricGroupFactory,
-                archivedExecutionGraphStore,
+                executionGraphInfoStore,
                 fatalErrorHandler,
                 historyServerArchivist,
                 metricQueryServiceAddress,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
index 0ce3ba6..28b4217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -28,8 +28,7 @@ public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
     INSTANCE;
 
     @Override
-    public CompletableFuture<Acknowledge> archiveExecutionGraph(
-            AccessExecutionGraph executionGraph) {
+    public CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraph) {
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8caa6a7..04111ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -145,7 +145,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
     @GuardedBy("lock")
     private ExecutorService ioExecutor;
 
-    private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+    private ExecutionGraphInfoStore executionGraphInfoStore;
 
     private final Thread shutDownHook;
 
@@ -254,7 +254,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                             blobServer,
                             heartbeatServices,
                             metricRegistry,
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             new RpcMetricQueryServiceRetriever(
                                     metricRegistry.getMetricQueryServiceRpcService()),
                             this);
@@ -322,7 +322,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                             ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                     configuration));
 
-            archivedExecutionGraphStore =
+            executionGraphInfoStore =
                     createSerializableExecutionGraphStore(
                             configuration, commonRpcService.getScheduledExecutor());
         }
@@ -399,9 +399,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
                 }
             }
 
-            if (archivedExecutionGraphStore != null) {
+            if (executionGraphInfoStore != null) {
                 try {
-                    archivedExecutionGraphStore.close();
+                    executionGraphInfoStore.close();
                 } catch (Throwable t) {
                     exception = ExceptionUtils.firstOrSuppressed(t, exception);
                 }
@@ -538,7 +538,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
             createDispatcherResourceManagerComponentFactory(Configuration configuration)
                     throws IOException;
 
-    protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
 
     protected static EntrypointClusterConfiguration parseArguments(String[] args)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2b19f6e..889ea7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 
 /** Base class for per-job cluster entry points. */
 public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
@@ -31,8 +31,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) {
-        return new MemoryArchivedExecutionGraphStore();
+        return new MemoryExecutionGraphInfoStore();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fb46589..b455f02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
@@ -39,7 +39,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
     }
 
     @Override
-    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+    protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException {
         final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
 
@@ -50,7 +50,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
         final long maximumCacheSizeBytes =
                 configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
 
-        return new FileArchivedExecutionGraphStore(
+        return new FileExecutionGraphInfoStore(
                 tmpDir,
                 expirationTime,
                 maximumCapacity,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index e9b7847..10963ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -106,7 +106,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
             BlobServer blobServer,
             HeartbeatServices heartbeatServices,
             MetricRegistry metricRegistry,
-            ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
             FatalErrorHandler fatalErrorHandler)
             throws Exception {
@@ -201,7 +201,7 @@ public class DefaultDispatcherResourceManagerComponentFactory
                             () ->
                                     MetricUtils.instantiateJobManagerMetricGroup(
                                             metricRegistry, hostname),
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             fatalErrorHandler,
                             historyServerArchivist,
                             metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index 7182f3f..8cddd19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -41,7 +41,7 @@ public interface DispatcherResourceManagerComponentFactory {
             BlobServer blobServer,
             HeartbeatServices heartbeatServices,
             MetricRegistry metricRegistry,
-            ArchivedExecutionGraphStore archivedExecutionGraphStore,
+            ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
             FatalErrorHandler fatalErrorHandler)
             throws Exception;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 01fb137..f6d820f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 /** Interface for completion actions once a Flink job has reached a terminal state. */
 public interface OnCompletionActions {
@@ -27,9 +27,9 @@ public interface OnCompletionActions {
     /**
      * Job reached a globally terminal state.
      *
-     * @param executionGraph serializable execution graph
+     * @param executionGraphInfo contains information about the terminated job
      */
-    void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph);
+    void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo);
 
     /** Job was finished by another JobMaster. */
     void jobFinishedByOther();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index 629326c..042945d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -241,10 +241,10 @@ public class JobManagerRunnerImpl
 
     /** Job completion notification triggered by JobManager. */
     @Override
-    public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+    public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
         unregisterJobFromHighAvailability();
-        // complete the result future with the terminal execution graph
-        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraph));
+        // complete the result future with the information of the information of the terminated job
+        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
     }
 
     /** Job completion notification triggered by self. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
index ffda268..53542ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -28,37 +28,37 @@ import java.util.Objects;
 /** The result of the {@link JobManagerRunner}. */
 public final class JobManagerRunnerResult {
 
-    @Nullable private final ArchivedExecutionGraph archivedExecutionGraph;
+    @Nullable private final ExecutionGraphInfo executionGraphInfo;
 
     @Nullable private final Throwable failure;
 
     private JobManagerRunnerResult(
-            @Nullable ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable failure) {
-        this.archivedExecutionGraph = archivedExecutionGraph;
+            @Nullable ExecutionGraphInfo executionGraphInfo, @Nullable Throwable failure) {
+        this.executionGraphInfo = executionGraphInfo;
         this.failure = failure;
     }
 
     public boolean isSuccess() {
-        return archivedExecutionGraph != null && failure == null;
+        return executionGraphInfo != null && failure == null;
     }
 
     public boolean isJobNotFinished() {
-        return archivedExecutionGraph == null && failure == null;
+        return executionGraphInfo == null && failure == null;
     }
 
     public boolean isInitializationFailure() {
-        return archivedExecutionGraph == null && failure != null;
+        return executionGraphInfo == null && failure != null;
     }
 
     /**
      * This method returns the payload of the successful JobManagerRunnerResult.
      *
-     * @return the successful completed {@link ArchivedExecutionGraph}
+     * @return the @link ExecutionGraphInfo} of a successfully finished job
      * @throws IllegalStateException if the result is not a success
      */
-    public ArchivedExecutionGraph getArchivedExecutionGraph() {
+    public ExecutionGraphInfo getExecutionGraphInfo() {
         Preconditions.checkState(isSuccess());
-        return archivedExecutionGraph;
+        return executionGraphInfo;
     }
 
     /**
@@ -81,21 +81,21 @@ public final class JobManagerRunnerResult {
             return false;
         }
         JobManagerRunnerResult that = (JobManagerRunnerResult) o;
-        return Objects.equals(archivedExecutionGraph, that.archivedExecutionGraph)
+        return Objects.equals(executionGraphInfo, that.executionGraphInfo)
                 && Objects.equals(failure, that.failure);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(archivedExecutionGraph, failure);
+        return Objects.hash(executionGraphInfo, failure);
     }
 
     public static JobManagerRunnerResult forJobNotFinished() {
         return new JobManagerRunnerResult(null, null);
     }
 
-    public static JobManagerRunnerResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
-        return new JobManagerRunnerResult(archivedExecutionGraph, null);
+    public static JobManagerRunnerResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
+        return new JobManagerRunnerResult(executionGraphInfo, null);
     }
 
     public static JobManagerRunnerResult forInitializationFailure(Throwable failure) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c353258..483b06a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
@@ -74,6 +73,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -766,7 +766,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         return CompletableFuture.completedFuture(schedulerNG.requestJob());
     }
 
@@ -975,11 +975,9 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
                                                     : partitionTracker
                                                             ::stopTrackingAndReleasePartitionsFor));
 
-            final ArchivedExecutionGraph archivedExecutionGraph = schedulerNG.requestJob();
+            final ExecutionGraphInfo executionGraphInfo = schedulerNG.requestJob();
             scheduledExecutorService.execute(
-                    () ->
-                            jobCompletionActions.jobReachedGloballyTerminalState(
-                                    archivedExecutionGraph));
+                    () -> jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo));
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 05a67e6..f742eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -209,12 +209,12 @@ public interface JobMasterGateway
     CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
 
     /**
-     * Requests the {@link ArchivedExecutionGraph} of the executed job.
+     * Requests the {@link ExecutionGraphInfo} of the executed job.
      *
      * @param timeout for the rpc call
-     * @return Future which is completed with the {@link ArchivedExecutionGraph} of the executed job
+     * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job
      */
-    CompletableFuture<ArchivedExecutionGraph> requestJob(@RpcTimeout Time timeout);
+    CompletableFuture<ExecutionGraphInfo> requestJob(@RpcTimeout Time timeout);
 
     /**
      * Triggers taking a savepoint of the executed job.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 41aad53..e1753bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -468,7 +468,7 @@ public class MiniCluster implements AutoCloseableAsync {
                         blobServer,
                         heartbeatServices,
                         metricRegistry,
-                        new MemoryArchivedExecutionGraphStore(),
+                        new MemoryExecutionGraphInfoStore(),
                         metricQueryServiceRetriever,
                         fatalErrorHandler));
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
similarity index 53%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
index f423e2e..f8d2aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractAccessExecutionGraphHandler.java
@@ -1,4 +1,3 @@
-package org.apache.flink.runtime.rest.handler.job;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,63 +16,59 @@ package org.apache.flink.runtime.rest.handler.job;
  * limitations under the License.
  */
 
+package org.apache.flink.runtime.rest.handler.job;
+
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
-/** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
-        implements JsonArchivist {
+/**
+ * {@code AbstractAccessExecutionGraphHandler} handles requests that require accessing the job's
+ * {@link AccessExecutionGraph}.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameter type
+ */
+public abstract class AbstractAccessExecutionGraphHandler<
+                R extends ResponseBody, M extends JobMessageParameters>
+        extends AbstractExecutionGraphHandler<R, M> {
 
-    public JobPlanHandler(
+    protected AbstractAccessExecutionGraphHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
-            Map<String, String> headers,
-            MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
             ExecutionGraphCache executionGraphCache,
             Executor executor) {
-
-        super(leaderRetriever, timeout, headers, messageHeaders, executionGraphCache, executor);
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
     }
 
     @Override
-    protected JobPlanInfo handleRequest(
-            HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
-            AccessExecutionGraph executionGraph)
+    protected R handleRequest(
+            HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
             throws RestHandlerException {
-        return createJobPlanInfo(executionGraph);
+        return handleRequest(request, executionGraphInfo.getArchivedExecutionGraph());
     }
 
-    @Override
-    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
-            throws IOException {
-        ResponseBody json = createJobPlanInfo(graph);
-        String path =
-                getMessageHeaders()
-                        .getTargetRestEndpointURL()
-                        .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
-        return Collections.singleton(new ArchivedJson(path, json));
-    }
-
-    private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) {
-        return new JobPlanInfo(executionGraph.getJsonPlan());
-    }
+    protected abstract R handleRequest(
+            HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+            throws RestHandlerException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 8c940ec..e8910a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -45,9 +45,10 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
- * Base class for all {@link AccessExecutionGraph} based REST handlers.
+ * Base class for all {@link ExecutionGraphInfo} based REST handlers.
  *
  * @param <R> response type
+ * @param <M> job message parameter type
  */
 public abstract class AbstractExecutionGraphHandler<
                 R extends ResponseBody, M extends JobMessageParameters>
@@ -76,8 +77,8 @@ public abstract class AbstractExecutionGraphHandler<
             throws RestHandlerException {
         JobID jobId = request.getPathParameter(JobIDPathParameter.class);
 
-        CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                executionGraphCache.getExecutionGraph(jobId, gateway);
+        CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+                executionGraphCache.getExecutionGraphInfo(jobId, gateway);
 
         return executionGraphFuture
                 .thenApplyAsync(
@@ -104,15 +105,15 @@ public abstract class AbstractExecutionGraphHandler<
     }
 
     /**
-     * Called for each request after the corresponding {@link AccessExecutionGraph} has been
-     * retrieved from the {@link ExecutionGraphCache}.
+     * Called for each request after the corresponding {@link ExecutionGraphInfo} has been retrieved
+     * from the {@link ExecutionGraphCache}.
      *
      * @param request for further information
-     * @param executionGraph for which the handler was called
+     * @param executionGraphInfo for which the handler was called
      * @return Response
      * @throws RestHandlerException if the handler could not process the request
      */
     protected abstract R handleRequest(
-            HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph)
+            HandlerRequest<EmptyRequestBody, M> request, ExecutionGraphInfo executionGraphInfo)
             throws RestHandlerException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
index ee09d02..1e001ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractJobVertexHandler<
                 R extends ResponseBody, M extends JobVertexMessageParameters>
-        extends AbstractExecutionGraphHandler<R, M> {
+        extends AbstractAccessExecutionGraphHandler<R, M> {
 
     /**
      * Instantiates a new Abstract job vertex handler.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 9c41953..70dd6ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -48,7 +48,8 @@ import java.util.concurrent.Executor;
 
 /** Request handler that returns the aggregated accumulators of a job. */
 public class JobAccumulatorsHandler
-        extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
         implements JsonArchivist {
 
     public JobAccumulatorsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index cbdc8b6..53ee2b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
 
 /** Handler serving the job configuration. */
 public class JobConfigHandler
-        extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public JobConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 9261674..84c6d55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
 
 /** Handler returning the details for the specified job. */
 public class JobDetailsHandler
-        extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
         implements JsonArchivist {
 
     private final MetricFetcher metricFetcher;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 819f727..1fa496e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -49,7 +49,8 @@ import java.util.concurrent.Executor;
 
 /** Handler serving the job exceptions. */
 public class JobExceptionsHandler
-        extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobExceptionsMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobExceptionsInfo, JobExceptionsMessageParameters>
         implements JsonArchivist {
 
     static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index f423e2e..fcf48ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -40,7 +40,8 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 /** Handler serving the job execution plan. */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
+public class JobPlanHandler
+        extends AbstractAccessExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public JobPlanHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 1ab070f..12e8542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -53,7 +53,8 @@ import java.util.concurrent.Executor;
 
 /** Request handler for the job vertex details. */
 public class JobVertexDetailsHandler
-        extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobVertexDetailsInfo, JobVertexMessageParameters>
         implements JsonArchivist {
     private final MetricFetcher metricFetcher;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 0408fa1..1e38e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,8 @@ import java.util.concurrent.Executor;
  * and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler
-        extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<
+                JobVertexTaskManagersInfo, JobVertexMessageParameters>
         implements JsonArchivist {
     private MetricFetcher metricFetcher;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 9f2763c..1e2817e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -47,7 +47,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractCheckpointHandler<
                 R extends ResponseBody, M extends CheckpointMessageParameters>
-        extends AbstractExecutionGraphHandler<R, M> {
+        extends AbstractAccessExecutionGraphHandler<R, M> {
 
     private final CheckpointStatsCache checkpointStatsCache;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 597bcb5..7684271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -49,7 +49,7 @@ import java.util.concurrent.Executor;
 
 /** Handler which serves the checkpoint configuration. */
 public class CheckpointConfigHandler
-        extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
         implements JsonArchivist {
 
     public CheckpointConfigHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index 320f7e0..db6f79c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -56,7 +56,7 @@ import java.util.concurrent.Executor;
 
 /** Handler which serves the checkpoint statistics. */
 public class CheckpointingStatisticsHandler
-        extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
+        extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
         implements JsonArchivist {
 
     public CheckpointingStatisticsHandler(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index 3ed7e4d..de40e14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -20,8 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -63,12 +62,12 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity());
     }
 
-    private CompletableFuture<ArchivedExecutionGraph> getExecutionGraphInternal(
+    private CompletableFuture<ExecutionGraphInfo> getExecutionGraphInternal(
             JobID jobId, RestfulGateway restfulGateway) {
         Preconditions.checkState(running, "ExecutionGraphCache is no longer running");
 
@@ -78,10 +77,10 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
             final long currentTime = System.currentTimeMillis();
 
             if (oldEntry != null && currentTime < oldEntry.getTTL()) {
-                final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
-                        oldEntry.getExecutionGraphFuture();
-                if (!executionGraphFuture.isCompletedExceptionally()) {
-                    return executionGraphFuture;
+                final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                        oldEntry.getExecutionGraphInfoFuture();
+                if (!executionGraphInfoFuture.isCompletedExceptionally()) {
+                    return executionGraphInfoFuture;
                 }
                 // otherwise it must be completed exceptionally
             }
@@ -96,22 +95,23 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
             } else {
                 successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
                 // cancel potentially outstanding futures
-                oldEntry.getExecutionGraphFuture().cancel(false);
+                oldEntry.getExecutionGraphInfoFuture().cancel(false);
             }
 
             if (successfulUpdate) {
-                final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture =
-                        restfulGateway.requestJob(jobId, timeout);
+                final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                        restfulGateway.requestExecutionGraphInfo(jobId, timeout);
 
-                executionGraphFuture.whenComplete(
-                        (ArchivedExecutionGraph executionGraph, Throwable throwable) -> {
+                executionGraphInfoFuture.whenComplete(
+                        (ExecutionGraphInfo executionGraph, Throwable throwable) -> {
                             if (throwable != null) {
-                                newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
+                                newEntry.getExecutionGraphInfoFuture()
+                                        .completeExceptionally(throwable);
 
                                 // remove exceptionally completed entry because it doesn't help
                                 cachedExecutionGraphs.remove(jobId, newEntry);
                             } else {
-                                newEntry.getExecutionGraphFuture().complete(executionGraph);
+                                newEntry.getExecutionGraphInfoFuture().complete(executionGraph);
                             }
                         });
 
@@ -120,7 +120,7 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
                     cachedExecutionGraphs.remove(jobId, newEntry);
                 }
 
-                return newEntry.getExecutionGraphFuture();
+                return newEntry.getExecutionGraphInfoFuture();
             }
         }
     }
@@ -139,19 +139,19 @@ public class DefaultExecutionGraphCache implements ExecutionGraphCache {
     private static final class ExecutionGraphEntry {
         private final long ttl;
 
-        private final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture;
+        private final CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture;
 
         ExecutionGraphEntry(long ttl) {
             this.ttl = ttl;
-            this.executionGraphFuture = new CompletableFuture<>();
+            this.executionGraphInfoFuture = new CompletableFuture<>();
         }
 
         public long getTTL() {
             return ttl;
         }
 
-        public CompletableFuture<ArchivedExecutionGraph> getExecutionGraphFuture() {
-            return executionGraphFuture;
+        public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfoFuture() {
+            return executionGraphInfoFuture;
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 8309f27..b970399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -19,17 +19,16 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache
- * entry has an associated time to live after which a new request will trigger the reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Cache for {@link ExecutionGraphInfo} which are obtained from the Flink cluster. Every cache entry
+ * has an associated time to live after which a new request will trigger the reloading of the {@link
+ * ExecutionGraphInfo} from the cluster.
  */
 public interface ExecutionGraphCache extends Closeable {
 
@@ -37,15 +36,15 @@ public interface ExecutionGraphCache extends Closeable {
     int size();
 
     /**
-     * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The {@link
-     * AccessExecutionGraph} will be requested again after the refresh interval has passed or if the
+     * Gets the {@link ExecutionGraphInfo} for the given {@link JobID} and caches it. The {@link
+     * ExecutionGraphInfo} will be requested again after the refresh interval has passed or if the
      * graph could not be retrieved from the given gateway.
      *
-     * @param jobId identifying the {@link ArchivedExecutionGraph} to get
-     * @param restfulGateway to request the {@link ArchivedExecutionGraph} from
-     * @return Future containing the requested {@link ArchivedExecutionGraph}
+     * @param jobId identifying the {@link ExecutionGraphInfo} to get
+     * @param restfulGateway to request the {@link ExecutionGraphInfo} from
+     * @return Future containing the requested {@link ExecutionGraphInfo}
      */
-    CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway);
 
     /** Perform the cleanup of out dated cache entries. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
new file mode 100644
index 0000000..4b7b746
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExecutionGraphInfo} serves as a composite class that provides different {@link
+ * ExecutionGraph}-related information.
+ */
+public class ExecutionGraphInfo implements Serializable {
+
+    private static final long serialVersionUID = -6134203195124124202L;
+
+    private final ArchivedExecutionGraph executionGraph;
+    private final List<ErrorInfo> exceptionHistory;
+
+    public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
+        this(executionGraph, Collections.emptyList());
+    }
+
+    public ExecutionGraphInfo(
+            ArchivedExecutionGraph executionGraph, List<ErrorInfo> exceptionHistory) {
+        this.executionGraph = executionGraph;
+        this.exceptionHistory = exceptionHistory;
+    }
+
+    public JobID getJobId() {
+        return executionGraph.getJobID();
+    }
+
+    public ArchivedExecutionGraph getArchivedExecutionGraph() {
+        return executionGraph;
+    }
+
+    public List<ErrorInfo> getExceptionHistory() {
+        return exceptionHistory;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 64070ed..9b1e83d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -763,9 +763,10 @@ public abstract class SchedulerBase implements SchedulerNG {
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
+    public ExecutionGraphInfo requestJob() {
         mainThreadExecutor.assertRunningInMainThread();
-        return ArchivedExecutionGraph.createFrom(executionGraph);
+        return new ExecutionGraphInfo(
+                ArchivedExecutionGraph.createFrom(executionGraph), getExceptionHistory());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5db4c5d..c88c8c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -92,7 +91,7 @@ public interface SchedulerNG {
 
     void notifyPartitionDataAvailable(ResultPartitionID partitionID);
 
-    ArchivedExecutionGraph requestJob();
+    ExecutionGraphInfo requestJob();
 
     JobStatus requestJobStatus();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index fdd1a5b..7a332cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerUtils;
@@ -368,8 +369,9 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
-        return state.getJob();
+    public ExecutionGraphInfo requestJob() {
+        // no exception history support is added for now (see FLINK-21439)
+        return new ExecutionGraphInfo(state.getJob());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 6a1d317..62b5f76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collection;
@@ -67,7 +68,24 @@ public interface RestfulGateway extends RpcGateway {
      * @return Future containing the {@link ArchivedExecutionGraph} for the given jobId, otherwise
      *     {@link FlinkJobNotFoundException}
      */
-    CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
+    default CompletableFuture<ArchivedExecutionGraph> requestJob(
+            JobID jobId, @RpcTimeout Time timeout) {
+        return requestExecutionGraphInfo(jobId, timeout)
+                .thenApply(ExecutionGraphInfo::getArchivedExecutionGraph);
+    }
+
+    /**
+     * Requests the {@link ExecutionGraphInfo} containing additional information besides the {@link
+     * ArchivedExecutionGraph}. If there is no such graph, then the future is completed with a
+     * {@link FlinkJobNotFoundException}.
+     *
+     * @param jobId identifying the job whose {@link ExecutionGraphInfo} is requested
+     * @param timeout for the asynchronous operation
+     * @return Future containing the {@link ExecutionGraphInfo} for the given jobId, otherwise
+     *     {@link FlinkJobNotFoundException}
+     */
+    CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, @RpcTimeout Time timeout);
 
     /**
      * Requests the {@link JobResult} of a job specified by the given jobId.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
index a2edd76..e1da5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.history;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -43,4 +44,23 @@ public interface JsonArchivist {
      * @throws IOException thrown if the JSON generation fails
      */
     Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+
+    /**
+     * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
+     * respective REST URL for a given job.
+     *
+     * <p>The collection should contain one entry for every response that could be generated for the
+     * given job, for example one entry for each task. The REST URLs should be unique and must not
+     * contain placeholders.
+     *
+     * @param executionGraphInfo {@link AccessExecutionGraph}-related information for which the
+     *     responses should be generated
+     * @return Collection containing an ArchivedJson for every response that could be generated for
+     *     the given job
+     * @throws IOException thrown if the JSON generation fails
+     */
+    default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
+            throws IOException {
+        return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
index 9738809..66b8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -90,7 +91,9 @@ public class DispatcherJobTest extends TestLogger {
         // assert result future done
         DispatcherJobResult result = dispatcherJob.getResultFuture().get();
 
-        assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FINISHED));
+        assertThat(
+                result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+                is(JobStatus.FINISHED));
     }
 
     @Test
@@ -116,7 +119,12 @@ public class DispatcherJobTest extends TestLogger {
         assertThat(dispatcherJob.isInitialized(), is(true));
         // assert that the result future completes
         assertThat(
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.CANCELED));
     }
 
@@ -134,7 +142,12 @@ public class DispatcherJobTest extends TestLogger {
         cancelFuture.get();
         assertJobStatus(dispatcherJob, JobStatus.CANCELED);
         assertThat(
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(),
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.CANCELED));
     }
 
@@ -171,7 +184,11 @@ public class DispatcherJobTest extends TestLogger {
         assertJobStatus(dispatcherJob, JobStatus.FAILED);
 
         ArchivedExecutionGraph aeg =
-                dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+                dispatcherJob
+                        .getResultFuture()
+                        .get()
+                        .getExecutionGraphInfo()
+                        .getArchivedExecutionGraph();
         assertThat(
                 aeg.getFailureInfo()
                         .getException()
@@ -188,7 +205,9 @@ public class DispatcherJobTest extends TestLogger {
 
         DispatcherJobResult result = dispatcherJob.getResultFuture().get();
         assertThat(result.isInitializationFailure(), is(true));
-        assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FAILED));
+        assertThat(
+                result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(),
+                is(JobStatus.FAILED));
         assertThat(
                 result.getInitializationFailure().getMessage(),
                 containsString("Artificial failure"));
@@ -298,13 +317,14 @@ public class DispatcherJobTest extends TestLogger {
                             .setRequestJobSupplier(
                                     () ->
                                             CompletableFuture.completedFuture(
-                                                    ArchivedExecutionGraph
-                                                            .createFromInitializingJob(
-                                                                    getJobID(),
-                                                                    "test",
-                                                                    internalJobStatus,
-                                                                    null,
-                                                                    1337)))
+                                                    new ExecutionGraphInfo(
+                                                            ArchivedExecutionGraph
+                                                                    .createFromInitializingJob(
+                                                                            getJobID(),
+                                                                            "test",
+                                                                            internalJobStatus,
+                                                                            null,
+                                                                            1337))))
                             .setRequestJobDetailsSupplier(
                                     () -> {
                                         JobDetails jobDetails =
@@ -359,8 +379,9 @@ public class DispatcherJobTest extends TestLogger {
             internalJobStatus = JobStatus.FINISHED;
             resultFuture.complete(
                     JobManagerRunnerResult.forSuccess(
-                            ArchivedExecutionGraph.createFromInitializingJob(
-                                    getJobID(), "test", JobStatus.FINISHED, null, 1337)));
+                            new ExecutionGraphInfo(
+                                    ArchivedExecutionGraph.createFromInitializingJob(
+                                            getJobID(), "test", JobStatus.FINISHED, null, 1337))));
         }
 
         public void finishCancellation() {
@@ -370,12 +391,14 @@ public class DispatcherJobTest extends TestLogger {
                         runner.getResultFuture()
                                 .complete(
                                         JobManagerRunnerResult.forSuccess(
-                                                ArchivedExecutionGraph.createFromInitializingJob(
-                                                        getJobID(),
-                                                        "test",
-                                                        JobStatus.CANCELED,
-                                                        null,
-                                                        1337)));
+                                                new ExecutionGraphInfo(
+                                                        ArchivedExecutionGraph
+                                                                .createFromInitializingJob(
+                                                                        getJobID(),
+                                                                        "test",
+                                                                        JobStatus.CANCELED,
+                                                                        null,
+                                                                        1337))));
                         cancellationFuture.complete(Acknowledge.get());
                     });
         }
@@ -384,7 +407,9 @@ public class DispatcherJobTest extends TestLogger {
     private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus)
             throws Exception {
         assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus));
-        assertThat(dispatcherJob.requestJob(TIMEOUT).get().getState(), is(expectedStatus));
+        assertThat(
+                dispatcherJob.requestJob(TIMEOUT).get().getArchivedExecutionGraph().getState(),
+                is(expectedStatus));
         assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index fb02a7b..caa1574 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -196,8 +197,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
         TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
         final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-        final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
-                new MemoryArchivedExecutionGraphStore();
+        final MemoryExecutionGraphInfoStore archivedExecutionGraphStore =
+                new MemoryExecutionGraphInfoStore();
         dispatcher =
                 new TestingDispatcher(
                         rpcService,
@@ -349,10 +350,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         testingJobManagerRunner.getCloseAsyncCalledLatch().await();
         testingJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.FINISHED)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobId)
+                                .setState(JobStatus.FINISHED)
+                                .build()));
 
         testingJobManagerRunner.completeTerminationFuture();
 
@@ -378,10 +380,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
         testingJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setState(JobStatus.FINISHED)
-                        .setJobID(jobId)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setState(JobStatus.FINISHED)
+                                .setJobID(jobId)
+                                .build()));
 
         // wait for the clearing
         clearedJobLatch.await();
@@ -459,10 +462,11 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
     private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
         takeCreatedJobManagerRunner.completeResultFuture(
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobId)
-                        .setState(JobStatus.FINISHED)
-                        .build());
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobId)
+                                .setState(JobStatus.FINISHED)
+                                .build()));
     }
 
     private void assertThatHABlobsHaveNotBeenRemoved() {
@@ -578,7 +582,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(executionGraph);
+        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
 
         assertThat(cleanupJobFuture.get(), equalTo(jobId));
         assertThat(deleteAllHABlobsFuture.isDone(), is(false));
@@ -597,7 +601,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
         final TestingJobManagerRunner testingJobManagerRunner =
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-        testingJobManagerRunner.completeResultFuture(executionGraph);
+        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
 
         assertThat(cleanupJobFuture.get(), equalTo(jobId));
         assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 11e799e..b4c2cd8 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
@@ -264,8 +265,8 @@ public class DispatcherTest extends TestLogger {
             TestingResourceManagerGateway resourceManagerGateway =
                     new TestingResourceManagerGateway();
 
-            final MemoryArchivedExecutionGraphStore archivedExecutionGraphStore =
-                    new MemoryArchivedExecutionGraphStore();
+            final MemoryExecutionGraphInfoStore executionGraphInfoStore =
+                    new MemoryExecutionGraphInfoStore();
 
             return new TestingDispatcher(
                     rpcService,
@@ -278,7 +279,7 @@ public class DispatcherTest extends TestLogger {
                             () -> CompletableFuture.completedFuture(resourceManagerGateway),
                             blobServer,
                             heartbeatServices,
-                            archivedExecutionGraphStore,
+                            executionGraphInfoStore,
                             testingFatalErrorHandlerResource.getFatalErrorHandler(),
                             VoidHistoryServerArchivist.INSTANCE,
                             null,
@@ -530,21 +531,23 @@ public class DispatcherTest extends TestLogger {
         final JobID failedJobId = new JobID();
 
         final JobStatus expectedState = JobStatus.FAILED;
-        final ArchivedExecutionGraph failedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(failedJobId)
-                        .setState(expectedState)
-                        .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L))
-                        .build();
+        final ExecutionGraphInfo failedExecutionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(failedJobId)
+                                .setState(expectedState)
+                                .setFailureCause(
+                                        new ErrorInfo(new RuntimeException("expected"), 1L))
+                                .build());
 
-        dispatcher.completeJobExecution(failedExecutionGraph);
+        dispatcher.completeJobExecution(failedExecutionGraphInfo);
 
         assertThat(
                 dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
                 equalTo(expectedState));
         assertThat(
-                dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(),
-                equalTo(failedExecutionGraph));
+                dispatcherGateway.requestExecutionGraphInfo(failedJobId, TIMEOUT).get(),
+                equalTo(failedExecutionGraphInfo));
     }
 
     @Test
@@ -970,13 +973,14 @@ public class DispatcherTest extends TestLogger {
                             .setRequestJobSupplier(
                                     () ->
                                             CompletableFuture.completedFuture(
-                                                    ArchivedExecutionGraph
-                                                            .createFromInitializingJob(
-                                                                    jobGraph.getJobID(),
-                                                                    jobGraph.getName(),
-                                                                    JobStatus.RUNNING,
-                                                                    null,
-                                                                    1337)))
+                                                    new ExecutionGraphInfo(
+                                                            ArchivedExecutionGraph
+                                                                    .createFromInitializingJob(
+                                                                            jobGraph.getJobID(),
+                                                                            jobGraph.getName(),
+                                                                            JobStatus.RUNNING,
+                                                                            null,
+                                                                            1337))))
                             .build();
             testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
             return testingRunner;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
similarity index 50%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
index 42e0325..cfd76a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.ManualTicker;
 import org.apache.flink.util.Preconditions;
@@ -57,8 +58,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-/** Tests for the {@link FileArchivedExecutionGraphStore}. */
-public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+/** Tests for the {@link FileExecutionGraphInfoStore}. */
+public class FileExecutionGraphInfoStoreTest extends TestLogger {
 
     private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS =
             Arrays.stream(JobStatus.values())
@@ -68,31 +69,32 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
     /**
-     * Tests that we can put {@link ArchivedExecutionGraph} into the {@link
-     * FileArchivedExecutionGraphStore} and that the graph is persisted.
+     * Tests that we can put {@link ExecutionGraphInfo} into the {@link FileExecutionGraphInfoStore}
+     * and that the graph is persisted.
      */
     @Test
     public void testPut() throws IOException {
-        final ArchivedExecutionGraph dummyExecutionGraph =
-                new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+        final ExecutionGraphInfo dummyExecutionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
 
             final File storageDirectory = executionGraphStore.getStorageDir();
 
             // check that the storage directory is empty
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
 
-            executionGraphStore.put(dummyExecutionGraph);
+            executionGraphStore.put(dummyExecutionGraphInfo);
 
             // check that we have persisted the given execution graph
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
 
             assertThat(
-                    executionGraphStore.get(dummyExecutionGraph.getJobID()),
-                    new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+                    executionGraphStore.get(dummyExecutionGraphInfo.getJobId()),
+                    new PartialExecutionGraphInfoMatcher(dummyExecutionGraphInfo));
         }
     }
 
@@ -101,8 +103,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     public void testUnknownGet() throws IOException {
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
             assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
         }
     }
@@ -111,11 +113,12 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @Test
     public void testStoredJobsOverview() throws IOException {
         final int numberExecutionGraphs = 10;
-        final Collection<ArchivedExecutionGraph> executionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> executionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
         final List<JobStatus> jobStatuses =
-                executionGraphs.stream()
+                executionGraphInfos.stream()
+                        .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                         .map(ArchivedExecutionGraph::getState)
                         .collect(Collectors.toList());
 
@@ -123,14 +126,14 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
-                executionGraphStore.put(executionGraph);
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
             }
 
             assertThat(
-                    executionGraphStore.getStoredJobsOverview(),
+                    executionGraphInfoStore.getStoredJobsOverview(),
                     Matchers.equalTo(expectedJobsOverview));
         }
     }
@@ -139,21 +142,21 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
     @Test
     public void testAvailableJobDetails() throws IOException {
         final int numberExecutionGraphs = 10;
-        final Collection<ArchivedExecutionGraph> executionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> executionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
-        final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphs);
+        final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphInfos);
 
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
-                executionGraphStore.put(executionGraph);
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
             }
 
             assertThat(
-                    executionGraphStore.getAvailableJobDetails(),
+                    executionGraphInfoStore.getAvailableJobDetails(),
                     Matchers.containsInAnyOrder(jobDetails.toArray()));
         }
     }
@@ -170,8 +173,8 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         final ManualTicker manualTicker = new ManualTicker();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         expirationTime,
                         Integer.MAX_VALUE,
@@ -179,24 +182,29 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         scheduledExecutor,
                         manualTicker)) {
 
-            final ArchivedExecutionGraph executionGraph =
-                    new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+            final ExecutionGraphInfo executionGraphInfo =
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setState(JobStatus.FINISHED)
+                                    .build());
 
-            executionGraphStore.put(executionGraph);
+            executionGraphInfoStore.put(executionGraphInfo);
 
             // there should one execution graph
-            assertThat(executionGraphStore.size(), Matchers.equalTo(1));
+            assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
 
             manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS);
 
             // this should trigger the cleanup after expiration
             scheduledExecutor.triggerScheduledTasks();
 
-            assertThat(executionGraphStore.size(), Matchers.equalTo(0));
+            assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
 
-            assertThat(executionGraphStore.get(executionGraph.getJobID()), Matchers.nullValue());
+            assertThat(
+                    executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+                    Matchers.nullValue());
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             // check that the persisted file has been deleted
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
@@ -210,17 +218,20 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
 
         assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                createDefaultExecutionGraphStore(rootDir)) {
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                createDefaultExecutionGraphInfoStore(rootDir)) {
 
             assertThat(rootDir.listFiles().length, Matchers.equalTo(1));
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
 
-            executionGraphStore.put(
-                    new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
+            executionGraphInfoStore.put(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setState(JobStatus.FINISHED)
+                                    .build()));
 
             assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
         }
@@ -228,13 +239,13 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
         assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
     }
 
-    /** Tests that evicted {@link ArchivedExecutionGraph} are loaded from disk again. */
+    /** Tests that evicted {@link ExecutionGraphInfo} are loaded from disk again. */
     @Test
     public void testCacheLoading() throws IOException {
         final File rootDir = temporaryFolder.newFolder();
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         Time.hours(1L),
                         Integer.MAX_VALUE,
@@ -242,43 +253,47 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         TestingUtils.defaultScheduledExecutor(),
                         Ticker.systemTicker())) {
 
-            final LoadingCache<JobID, ArchivedExecutionGraph> executionGraphCache =
-                    executionGraphStore.getArchivedExecutionGraphCache();
+            final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache =
+                    executionGraphInfoStore.getExecutionGraphInfoCache();
 
-            Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(64);
+            Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(64);
 
             boolean continueInserting = true;
 
             // insert execution graphs until the first one got evicted
             while (continueInserting) {
                 // has roughly a size of 1.4 KB
-                final ArchivedExecutionGraph executionGraph =
-                        new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+                final ExecutionGraphInfo executionGraphInfo =
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setState(JobStatus.FINISHED)
+                                        .build());
 
-                executionGraphStore.put(executionGraph);
+                executionGraphInfoStore.put(executionGraphInfo);
 
-                executionGraphs.add(executionGraph);
+                executionGraphInfos.add(executionGraphInfo);
 
-                continueInserting = executionGraphCache.size() == executionGraphs.size();
+                continueInserting = executionGraphInfoCache.size() == executionGraphInfos.size();
             }
 
-            final File storageDirectory = executionGraphStore.getStorageDir();
+            final File storageDirectory = executionGraphInfoStore.getStorageDir();
 
             assertThat(
-                    storageDirectory.listFiles().length, Matchers.equalTo(executionGraphs.size()));
+                    storageDirectory.listFiles().length,
+                    Matchers.equalTo(executionGraphInfos.size()));
 
-            for (ArchivedExecutionGraph executionGraph : executionGraphs) {
+            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
                 assertThat(
-                        executionGraphStore.get(executionGraph.getJobID()),
-                        matchesPartiallyWith(executionGraph));
+                        executionGraphInfoStore.get(executionGraphInfo.getJobId()),
+                        matchesPartiallyWith(executionGraphInfo));
             }
         }
     }
 
     /**
-     * Tests that the size of {@link FileArchivedExecutionGraphStore} is no more than the configured
-     * max capacity and the old execution graphs will be purged if the total added number exceeds
-     * the max capacity.
+     * Tests that the size of {@link FileExecutionGraphInfoStore} is no more than the configured max
+     * capacity and the old execution graphs will be purged if the total added number exceeds the
+     * max capacity.
      */
     @Test
     public void testMaximumCapacity() throws IOException {
@@ -287,15 +302,15 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
         final int maxCapacity = 10;
         final int numberExecutionGraphs = 10;
 
-        final Collection<ArchivedExecutionGraph> oldExecutionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
-        final Collection<ArchivedExecutionGraph> newExecutionGraphs =
-                generateTerminalExecutionGraphs(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> oldExecutionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
+        final Collection<ExecutionGraphInfo> newExecutionGraphInfos =
+                generateTerminalExecutionGraphInfos(numberExecutionGraphs);
 
-        final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphs);
+        final Collection<JobDetails> jobDetails = generateJobDetails(newExecutionGraphInfos);
 
-        try (final FileArchivedExecutionGraphStore executionGraphStore =
-                new FileArchivedExecutionGraphStore(
+        try (final FileExecutionGraphInfoStore executionGraphInfoStore =
+                new FileExecutionGraphInfoStore(
                         rootDir,
                         Time.hours(1L),
                         maxCapacity,
@@ -303,42 +318,44 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                         TestingUtils.defaultScheduledExecutor(),
                         Ticker.systemTicker())) {
 
-            for (ArchivedExecutionGraph executionGraph : oldExecutionGraphs) {
-                executionGraphStore.put(executionGraph);
+            for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
                 // no more than the configured maximum capacity
-                assertTrue(executionGraphStore.size() <= maxCapacity);
+                assertTrue(executionGraphInfoStore.size() <= maxCapacity);
             }
 
-            for (ArchivedExecutionGraph executionGraph : newExecutionGraphs) {
-                executionGraphStore.put(executionGraph);
+            for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
+                executionGraphInfoStore.put(executionGraphInfo);
                 // equals to the configured maximum capacity
-                assertEquals(maxCapacity, executionGraphStore.size());
+                assertEquals(maxCapacity, executionGraphInfoStore.size());
             }
 
             // the older execution graphs are purged
             assertThat(
-                    executionGraphStore.getAvailableJobDetails(),
+                    executionGraphInfoStore.getAvailableJobDetails(),
                     Matchers.containsInAnyOrder(jobDetails.toArray()));
         }
     }
 
-    private Collection<ArchivedExecutionGraph> generateTerminalExecutionGraphs(int number) {
-        final Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(number);
+    private Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos(int number) {
+        final Collection<ExecutionGraphInfo> executionGraphInfos = new ArrayList<>(number);
 
         for (int i = 0; i < number; i++) {
             final JobStatus state =
                     GLOBALLY_TERMINAL_JOB_STATUS.get(
                             ThreadLocalRandom.current()
                                     .nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size()));
-            executionGraphs.add(new ArchivedExecutionGraphBuilder().setState(state).build());
+            executionGraphInfos.add(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder().setState(state).build()));
         }
 
-        return executionGraphs;
+        return executionGraphInfos;
     }
 
-    private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File storageDirectory)
+    private FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore(File storageDirectory)
             throws IOException {
-        return new FileArchivedExecutionGraphStore(
+        return new FileExecutionGraphInfoStore(
                 storageDirectory,
                 Time.hours(1L),
                 Integer.MAX_VALUE,
@@ -347,56 +364,65 @@ public class FileArchivedExecutionGraphStoreTest extends TestLogger {
                 Ticker.systemTicker());
     }
 
-    private static final class PartialArchivedExecutionGraphMatcher
-            extends BaseMatcher<ArchivedExecutionGraph> {
+    private static final class PartialExecutionGraphInfoMatcher
+            extends BaseMatcher<ExecutionGraphInfo> {
 
-        private final ArchivedExecutionGraph archivedExecutionGraph;
+        private final ExecutionGraphInfo expectedExecutionGraphInfo;
 
-        private PartialArchivedExecutionGraphMatcher(
-                ArchivedExecutionGraph expectedArchivedExecutionGraph) {
-            this.archivedExecutionGraph =
-                    Preconditions.checkNotNull(expectedArchivedExecutionGraph);
+        private PartialExecutionGraphInfoMatcher(ExecutionGraphInfo expectedExecutionGraphInfo) {
+            this.expectedExecutionGraphInfo =
+                    Preconditions.checkNotNull(expectedExecutionGraphInfo);
         }
 
         @Override
         public boolean matches(Object o) {
-            if (archivedExecutionGraph == o) {
+            if (expectedExecutionGraphInfo == o) {
                 return true;
             }
-            if (o == null || archivedExecutionGraph.getClass() != o.getClass()) {
+            if (o == null || expectedExecutionGraphInfo.getClass() != o.getClass()) {
                 return false;
             }
-            ArchivedExecutionGraph that = (ArchivedExecutionGraph) o;
-            return archivedExecutionGraph.isStoppable() == that.isStoppable()
-                    && Objects.equals(archivedExecutionGraph.getJobID(), that.getJobID())
-                    && Objects.equals(archivedExecutionGraph.getJobName(), that.getJobName())
-                    && archivedExecutionGraph.getState() == that.getState()
-                    && Objects.equals(archivedExecutionGraph.getJsonPlan(), that.getJsonPlan())
+            ExecutionGraphInfo that = (ExecutionGraphInfo) o;
+
+            ArchivedExecutionGraph thisExecutionGraph =
+                    expectedExecutionGraphInfo.getArchivedExecutionGraph();
+            ArchivedExecutionGraph thatExecutionGraph = that.getArchivedExecutionGraph();
+            return thisExecutionGraph.isStoppable() == thatExecutionGraph.isStoppable()
+                    && Objects.equals(thisExecutionGraph.getJobID(), thatExecutionGraph.getJobID())
+                    && Objects.equals(
+                            thisExecutionGraph.getJobName(), thatExecutionGraph.getJobName())
+                    && thisExecutionGraph.getState() == thatExecutionGraph.getState()
+                    && Objects.equals(
+                            thisExecutionGraph.getJsonPlan(), thatExecutionGraph.getJsonPlan())
+                    && Objects.equals(
+                            thisExecutionGraph.getAccumulatorsSerialized(),
+                            thatExecutionGraph.getAccumulatorsSerialized())
                     && Objects.equals(
-                            archivedExecutionGraph.getAccumulatorsSerialized(),
-                            that.getAccumulatorsSerialized())
+                            thisExecutionGraph.getCheckpointCoordinatorConfiguration(),
+                            thatExecutionGraph.getCheckpointCoordinatorConfiguration())
+                    && thisExecutionGraph.getAllVertices().size()
+                            == thatExecutionGraph.getAllVertices().size()
                     && Objects.equals(
-                            archivedExecutionGraph.getCheckpointCoordinatorConfiguration(),
-                            that.getCheckpointCoordinatorConfiguration())
-                    && archivedExecutionGraph.getAllVertices().size()
-                            == that.getAllVertices().size();
+                            expectedExecutionGraphInfo.getExceptionHistory(),
+                            that.getExceptionHistory());
         }
 
         @Override
         public void describeTo(Description description) {
             description.appendText(
-                    "Matches against " + ArchivedExecutionGraph.class.getSimpleName() + '.');
+                    "Matches against " + ExecutionGraphInfo.class.getSimpleName() + '.');
         }
     }
 
-    private static Matcher<ArchivedExecutionGraph> matchesPartiallyWith(
-            ArchivedExecutionGraph executionGraph) {
-        return new PartialArchivedExecutionGraphMatcher(executionGraph);
+    private static Matcher<ExecutionGraphInfo> matchesPartiallyWith(
+            ExecutionGraphInfo executionGraphInfo) {
+        return new PartialExecutionGraphInfoMatcher(executionGraphInfo);
     }
 
     private static Collection<JobDetails> generateJobDetails(
-            Collection<ArchivedExecutionGraph> executionGraphs) {
-        return executionGraphs.stream()
+            Collection<ExecutionGraphInfo> executionGraphInfos) {
+        return executionGraphInfos.stream()
+                .map(ExecutionGraphInfo::getArchivedExecutionGraph)
                 .map(JobDetails::createDetailsForJob)
                 .collect(Collectors.toList());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index e496863..8250d8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
@@ -38,6 +37,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.TestLogger;
 
@@ -74,7 +74,7 @@ public class MiniDispatcherTest extends TestLogger {
 
     private static JobGraph jobGraph;
 
-    private static ArchivedExecutionGraph archivedExecutionGraph;
+    private static ExecutionGraphInfo executionGraphInfo;
 
     private static TestingRpcService rpcService;
 
@@ -87,8 +87,8 @@ public class MiniDispatcherTest extends TestLogger {
 
     private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
 
-    private final ArchivedExecutionGraphStore archivedExecutionGraphStore =
-            new MemoryArchivedExecutionGraphStore();
+    private final ExecutionGraphInfoStore executionGraphInfoStore =
+            new MemoryExecutionGraphInfoStore();
 
     private TestingHighAvailabilityServices highAvailabilityServices;
 
@@ -98,11 +98,12 @@ public class MiniDispatcherTest extends TestLogger {
     public static void setupClass() throws IOException {
         jobGraph = new JobGraph();
 
-        archivedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobGraph.getJobID())
-                        .setState(JobStatus.FINISHED)
-                        .build();
+        executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobGraph.getJobID())
+                                .setState(JobStatus.FINISHED)
+                                .build());
 
         rpcService = new TestingRpcService();
         configuration = new Configuration();
@@ -166,7 +167,7 @@ public class MiniDispatcherTest extends TestLogger {
             final TestingJobManagerRunner testingJobManagerRunner =
                     testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
 
-            testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
 
             // wait until we terminate
             miniDispatcher.getShutDownFuture().get();
@@ -192,7 +193,7 @@ public class MiniDispatcherTest extends TestLogger {
             final TestingJobManagerRunner testingJobManagerRunner =
                     testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
 
-            testingJobManagerRunner.completeResultFuture(archivedExecutionGraph);
+            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
 
             assertFalse(miniDispatcher.getTerminationFuture().isDone());
 
@@ -228,10 +229,11 @@ public class MiniDispatcherTest extends TestLogger {
 
             dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L));
             testingJobManagerRunner.completeResultFuture(
-                    new ArchivedExecutionGraphBuilder()
-                            .setJobID(jobGraph.getJobID())
-                            .setState(JobStatus.CANCELED)
-                            .build());
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setJobID(jobGraph.getJobID())
+                                    .setState(JobStatus.CANCELED)
+                                    .build()));
 
             ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
             assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
@@ -256,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
                         () -> CompletableFuture.completedFuture(resourceManagerGateway),
                         blobServer,
                         heartbeatServices,
-                        archivedExecutionGraphStore,
+                        executionGraphInfoStore,
                         testingFatalErrorHandlerResource.getFatalErrorHandler(),
                         VoidHistoryServerArchivist.INSTANCE,
                         null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 0e53e0f..a774709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import javax.annotation.Nonnull;
 
@@ -64,8 +64,8 @@ class TestingDispatcher extends Dispatcher {
         startFuture.complete(null);
     }
 
-    void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
-        runAsync(() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+    void completeJobExecution(ExecutionGraphInfo executionGraphInfo) {
+        runAsync(() -> jobReachedGloballyTerminalState(executionGraphInfo));
     }
 
     CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index cbd2280..bad26fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherServices;
 import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -116,7 +116,7 @@ public class DefaultDispatcherRunnerITCase extends TestLogger {
                         blobServerResource.getBlobServer(),
                         new TestingHeartbeatServices(),
                         UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
-                        new MemoryArchivedExecutionGraphStore(),
+                        new MemoryExecutionGraphInfoStore(),
                         fatalErrorHandler,
                         VoidHistoryServerArchivist.INSTANCE,
                         null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index d38c365..e84e746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
@@ -161,7 +161,7 @@ public class ZooKeeperDefaultDispatcherRunnerTest extends TestLogger {
                             blobServer,
                             new TestingHeartbeatServices(),
                             UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             fatalErrorHandler,
                             VoidHistoryServerArchivist.INSTANCE,
                             null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 9328271..12b5b97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class ClusterEntrypointTest {
         }
 
         @Override
-        protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
+        protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
                 Configuration configuration, ScheduledExecutor scheduledExecutor)
                 throws IOException {
             throw new UnsupportedOperationException("Not needed for this test");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
index 275d799..0b2c9cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFacto
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -67,7 +67,7 @@ public class JobManagerRunnerImplTest extends TestLogger {
 
     private static JobGraph jobGraph;
 
-    private static ArchivedExecutionGraph archivedExecutionGraph;
+    private static ExecutionGraphInfo executionGraphInfo;
 
     private static JobMasterServiceFactory defaultJobMasterServiceFactory;
 
@@ -85,11 +85,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobGraph = new JobGraph(jobVertex);
 
-        archivedExecutionGraph =
-                new ArchivedExecutionGraphBuilder()
-                        .setJobID(jobGraph.getJobID())
-                        .setState(JobStatus.FINISHED)
-                        .build();
+        executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder()
+                                .setJobID(jobGraph.getJobID())
+                                .setState(JobStatus.FINISHED)
+                                .build());
     }
 
     @Before
@@ -120,12 +121,12 @@ public class JobManagerRunnerImplTest extends TestLogger {
 
             assertThat(resultFuture.isDone(), is(false));
 
-            jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph);
+            jobManagerRunner.jobReachedGloballyTerminalState(executionGraphInfo);
 
             final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
             assertThat(
                     jobManagerRunnerResult,
-                    is(JobManagerRunnerResult.forSuccess(archivedExecutionGraph)));
+                    is(JobManagerRunnerResult.forSuccess(executionGraphInfo)));
         } finally {
             jobManagerRunner.close();
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
index ef170cb..d90d753 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -33,14 +33,14 @@ import static org.junit.Assert.assertTrue;
 /** Tests for the {@link JobManagerRunnerResult}. */
 public class JobManagerRunnerResultTest extends TestLogger {
 
-    private final ArchivedExecutionGraph archivedExecutionGraph =
-            new ArchivedExecutionGraphBuilder().build();
+    private final ExecutionGraphInfo executionGraphInfo =
+            new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
     private final FlinkException testException = new FlinkException("test exception");
 
     @Test
     public void testSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
         assertTrue(jobManagerRunnerResult.isSuccess());
         assertFalse(jobManagerRunnerResult.isJobNotFinished());
@@ -70,9 +70,9 @@ public class JobManagerRunnerResultTest extends TestLogger {
     @Test
     public void testGetArchivedExecutionGraphFromSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
-        assertThat(jobManagerRunnerResult.getArchivedExecutionGraph(), is(archivedExecutionGraph));
+        assertThat(jobManagerRunnerResult.getExecutionGraphInfo(), is(executionGraphInfo));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -80,7 +80,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
         final JobManagerRunnerResult jobManagerRunnerResult =
                 JobManagerRunnerResult.forJobNotFinished();
 
-        jobManagerRunnerResult.getArchivedExecutionGraph();
+        jobManagerRunnerResult.getExecutionGraphInfo();
     }
 
     @Test(expected = IllegalStateException.class)
@@ -88,7 +88,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
         final JobManagerRunnerResult jobManagerRunnerResult =
                 JobManagerRunnerResult.forInitializationFailure(testException);
 
-        jobManagerRunnerResult.getArchivedExecutionGraph();
+        jobManagerRunnerResult.getExecutionGraphInfo();
     }
 
     @Test
@@ -110,7 +110,7 @@ public class JobManagerRunnerResultTest extends TestLogger {
     @Test(expected = IllegalStateException.class)
     public void testGetInitializationFailureFromSuccessfulJobManagerResult() {
         final JobManagerRunnerResult jobManagerRunnerResult =
-                JobManagerRunnerResult.forSuccess(archivedExecutionGraph);
+                JobManagerRunnerResult.forSuccess(executionGraphInfo);
 
         jobManagerRunnerResult.getInitializationFailure();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
index 6346172..8b57cfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
@@ -133,7 +133,11 @@ public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
         assertThat(deploymentTrackerWrapper.getStopFuture().get(), is(deployedExecution));
 
         assertThat(
-                onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getState(),
+                onCompletionActions
+                        .getJobReachedGloballyTerminalStateFuture()
+                        .get()
+                        .getArchivedExecutionGraph()
+                        .getState(),
                 is(JobStatus.FAILED));
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index bf78cea..ca04421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -105,6 +105,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
@@ -1199,7 +1200,7 @@ public class JobMasterTest extends TestLogger {
     private static Collection<AccessExecution> getExecutions(
             final JobMasterGateway jobMasterGateway) {
         final ArchivedExecutionGraph archivedExecutionGraph =
-                requestExecutionGraph(jobMasterGateway);
+                requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
 
         return archivedExecutionGraph.getAllVertices().values().stream()
                 .flatMap(vertex -> Arrays.stream(vertex.getTaskVertices()))
@@ -1210,7 +1211,7 @@ public class JobMasterTest extends TestLogger {
     private static List<AccessExecution> getExecutions(
             final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) {
         final ArchivedExecutionGraph archivedExecutionGraph =
-                requestExecutionGraph(jobMasterGateway);
+                requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
 
         return Optional.ofNullable(archivedExecutionGraph.getAllVertices().get(jobVertexId))
                 .map(
@@ -1221,7 +1222,7 @@ public class JobMasterTest extends TestLogger {
                 .collect(Collectors.toList());
     }
 
-    private static ArchivedExecutionGraph requestExecutionGraph(
+    private static ExecutionGraphInfo requestExecutionGraph(
             final JobMasterGateway jobMasterGateway) {
         try {
             return jobMasterGateway.requestJob(testingTimeout).get();
@@ -1821,7 +1822,10 @@ public class JobMasterTest extends TestLogger {
             jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway);
 
             final ArchivedExecutionGraph archivedExecutionGraph =
-                    onCompletionActions.getJobReachedGloballyTerminalStateFuture().get();
+                    onCompletionActions
+                            .getJobReachedGloballyTerminalStateFuture()
+                            .get()
+                            .getArchivedExecutionGraph();
 
             assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
         } finally {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 4412d72..1d80e17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -88,8 +88,8 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         return closeAsyncCalledLatch;
     }
 
-    public void completeResultFuture(ArchivedExecutionGraph archivedExecutionGraph) {
-        resultFuture.complete(JobManagerRunnerResult.forSuccess(archivedExecutionGraph));
+    public void completeResultFuture(ExecutionGraphInfo executionGraphInfo) {
+        resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
     }
 
     public void completeResultFutureExceptionally(Exception e) {
@@ -108,6 +108,7 @@ public class TestingJobManagerRunner implements JobManagerRunner {
         this.jobMasterGatewayFuture.complete(testingJobMasterGateway);
     }
 
+    /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */
     public static class Builder {
         private JobID jobId = null;
         private boolean blockingTermination = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 77de770..d8f2678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMet
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
@@ -207,15 +207,15 @@ public class JobMasterBuilder {
      */
     public static final class TestingOnCompletionActions implements OnCompletionActions {
 
-        private final CompletableFuture<ArchivedExecutionGraph>
-                jobReachedGloballyTerminalStateFuture = new CompletableFuture<>();
+        private final CompletableFuture<ExecutionGraphInfo> jobReachedGloballyTerminalStateFuture =
+                new CompletableFuture<>();
         private final CompletableFuture<Void> jobFinishedByOtherFuture = new CompletableFuture<>();
         private final CompletableFuture<Throwable> jobMasterFailedFuture =
                 new CompletableFuture<>();
 
         @Override
-        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
-            jobReachedGloballyTerminalStateFuture.complete(executionGraph);
+        public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
+            jobReachedGloballyTerminalStateFuture.complete(executionGraphInfo);
         }
 
         @Override
@@ -228,12 +228,11 @@ public class JobMasterBuilder {
             jobMasterFailedFuture.complete(cause);
         }
 
-        public CompletableFuture<ArchivedExecutionGraph>
-                getJobReachedGloballyTerminalStateFuture() {
+        public CompletableFuture<ExecutionGraphInfo> getJobReachedGloballyTerminalStateFuture() {
             return jobReachedGloballyTerminalStateFuture;
         }
 
-        public CompletableFuture<ArchivedExecutionGraph> getJobFinishedByOtherFuture() {
+        public CompletableFuture<ExecutionGraphInfo> getJobFinishedByOtherFuture() {
             return jobReachedGloballyTerminalStateFuture;
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index a6b4155..0464d51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -123,7 +123,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 
     @Nonnull private final Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier;
 
-    @Nonnull private final Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier;
+    @Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier;
 
     @Nonnull
     private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction;
@@ -221,7 +221,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
                             taskManagerHeartbeatConsumer,
             @Nonnull Consumer<ResourceID> resourceManagerHeartbeatConsumer,
             @Nonnull Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier,
-            @Nonnull Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier,
+            @Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier,
             @Nonnull
                     BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction,
             @Nonnull
@@ -388,7 +388,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
     }
 
     @Override
-    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+    public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
         return requestJobSupplier.get();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index c2dc9ab..e957311 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
@@ -112,7 +112,7 @@ public class TestingJobMasterGatewayBuilder {
     private Consumer<ResourceID> resourceManagerHeartbeatConsumer = ignored -> {};
     private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier =
             () -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
-    private Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier =
+    private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier =
             () -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
     private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction =
             (targetDirectory, ignoredB) ->
@@ -264,7 +264,7 @@ public class TestingJobMasterGatewayBuilder {
     }
 
     public TestingJobMasterGatewayBuilder setRequestJobSupplier(
-            Supplier<CompletableFuture<ArchivedExecutionGraph>> requestJobSupplier) {
+            Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier) {
         this.requestJobSupplier = requestJobSupplier;
         return this;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index 49d3772..d0767a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.minicluster;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -105,7 +105,7 @@ public class TestingMiniCluster extends MiniCluster {
                             blobServer,
                             heartbeatServices,
                             metricRegistry,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             metricQueryServiceRetriever,
                             fatalErrorHandler));
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
index ace82f9..269041a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
@@ -55,12 +54,13 @@ import static org.junit.Assert.fail;
 /** Tests for the {@link DefaultExecutionGraphCache}. */
 public class DefaultExecutionGraphCacheTest extends TestLogger {
 
-    private static ArchivedExecutionGraph expectedExecutionGraph;
+    private static ExecutionGraphInfo expectedExecutionGraphInfo;
     private static final JobID expectedJobId = new JobID();
 
     @BeforeClass
     public static void setup() {
-        expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build();
+        expectedExecutionGraphInfo =
+                new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
     }
 
     /** Tests that we can cache AccessExecutionGraphs over multiple accesses. */
@@ -71,19 +71,20 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
-                        expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+                        expectedJobId,
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
-            accessExecutionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
         }
@@ -98,23 +99,23 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
                         expectedJobId,
-                        CompletableFuture.completedFuture(expectedExecutionGraph),
-                        CompletableFuture.completedFuture(expectedExecutionGraph));
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo),
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture.get());
 
             // sleep for the TTL
             Thread.sleep(timeToLive.toMilliseconds() * 5L);
 
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphInfoFuture2 =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphInfoFuture2.get());
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2));
         }
@@ -135,25 +136,26 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
                         expectedJobId,
                         FutureUtils.completedExceptionally(
                                 new FlinkJobNotFoundException(expectedJobId)),
-                        CompletableFuture.completedFuture(expectedExecutionGraph));
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
             try {
                 executionGraphFuture.get();
 
                 fail("The execution graph future should have been completed exceptionally.");
             } catch (ExecutionException ee) {
+                ee.printStackTrace();
                 assertTrue(ee.getCause() instanceof FlinkException);
             }
 
-            CompletableFuture<AccessExecutionGraph> executionGraphFuture2 =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraphFuture2 =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraphFuture2.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraphFuture2.get());
         }
     }
 
@@ -166,21 +168,21 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         final Time timeout = Time.milliseconds(100L);
         final Time timeToLive = Time.milliseconds(1L);
         final JobID expectedJobId2 = new JobID();
-        final ArchivedExecutionGraph expectedExecutionGraph2 =
-                new ArchivedExecutionGraphBuilder().build();
+        final ExecutionGraphInfo expectedExecutionGraphInfo2 =
+                new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build());
 
         final AtomicInteger requestJobCalls = new AtomicInteger(0);
         final TestingRestfulGateway restfulGateway =
                 new TestingRestfulGateway.Builder()
-                        .setRequestJobFunction(
+                        .setRequestExecutionGraphInfoFunction(
                                 jobId -> {
                                     requestJobCalls.incrementAndGet();
                                     if (jobId.equals(expectedJobId)) {
                                         return CompletableFuture.completedFuture(
-                                                expectedExecutionGraph);
+                                                expectedExecutionGraphInfo);
                                     } else if (jobId.equals(expectedJobId2)) {
                                         return CompletableFuture.completedFuture(
-                                                expectedExecutionGraph2);
+                                                expectedExecutionGraphInfo2);
                                     } else {
                                         throw new AssertionError("Invalid job id received.");
                                     }
@@ -190,15 +192,15 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
 
-            CompletableFuture<AccessExecutionGraph> executionGraph1Future =
-                    executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraph1Future =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId, restfulGateway);
 
-            CompletableFuture<AccessExecutionGraph> executionGraph2Future =
-                    executionGraphCache.getExecutionGraph(expectedJobId2, restfulGateway);
+            CompletableFuture<ExecutionGraphInfo> executionGraph2Future =
+                    executionGraphCache.getExecutionGraphInfo(expectedJobId2, restfulGateway);
 
-            assertEquals(expectedExecutionGraph, executionGraph1Future.get());
+            assertEquals(expectedExecutionGraphInfo, executionGraph1Future.get());
 
-            assertEquals(expectedExecutionGraph2, executionGraph2Future.get());
+            assertEquals(expectedExecutionGraphInfo2, executionGraph2Future.get());
 
             assertThat(requestJobCalls.get(), Matchers.equalTo(2));
 
@@ -218,11 +220,12 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         final CountingRestfulGateway restfulGateway =
                 createCountingRestfulGateway(
-                        expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
+                        expectedJobId,
+                        CompletableFuture.completedFuture(expectedExecutionGraphInfo));
 
         final int numConcurrentAccesses = 10;
 
-        final ArrayList<CompletableFuture<AccessExecutionGraph>> executionGraphFutures =
+        final ArrayList<CompletableFuture<ExecutionGraphInfo>> executionGraphFutures =
                 new ArrayList<>(numConcurrentAccesses);
 
         final ExecutorService executor =
@@ -231,10 +234,10 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
         try (ExecutionGraphCache executionGraphCache =
                 new DefaultExecutionGraphCache(timeout, timeToLive)) {
             for (int i = 0; i < numConcurrentAccesses; i++) {
-                CompletableFuture<AccessExecutionGraph> executionGraphFuture =
+                CompletableFuture<ExecutionGraphInfo> executionGraphFuture =
                         CompletableFuture.supplyAsync(
                                         () ->
-                                                executionGraphCache.getExecutionGraph(
+                                                executionGraphCache.getExecutionGraphInfo(
                                                         expectedJobId, restfulGateway),
                                         executor)
                                 .thenCompose(Function.identity());
@@ -242,13 +245,13 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
                 executionGraphFutures.add(executionGraphFuture);
             }
 
-            final CompletableFuture<Collection<AccessExecutionGraph>> allExecutionGraphFutures =
+            final CompletableFuture<Collection<ExecutionGraphInfo>> allExecutionGraphFutures =
                     FutureUtils.combineAll(executionGraphFutures);
 
-            Collection<AccessExecutionGraph> allExecutionGraphs = allExecutionGraphFutures.get();
+            Collection<ExecutionGraphInfo> allExecutionGraphs = allExecutionGraphFutures.get();
 
-            for (AccessExecutionGraph executionGraph : allExecutionGraphs) {
-                assertEquals(expectedExecutionGraph, executionGraph);
+            for (ExecutionGraphInfo executionGraph : allExecutionGraphs) {
+                assertEquals(expectedExecutionGraphInfo, executionGraph);
             }
 
             assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1));
@@ -258,8 +261,8 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
     }
 
     private CountingRestfulGateway createCountingRestfulGateway(
-            JobID jobId, CompletableFuture<ArchivedExecutionGraph>... accessExecutionGraphs) {
-        final ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue =
+            JobID jobId, CompletableFuture<ExecutionGraphInfo>... accessExecutionGraphs) {
+        final ConcurrentLinkedQueue<CompletableFuture<ExecutionGraphInfo>> queue =
                 new ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
         return new CountingRestfulGateway(jobId, ignored -> queue.poll());
     }
@@ -276,16 +279,17 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
 
         private CountingRestfulGateway(
                 JobID expectedJobId,
-                Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+                Function<JobID, CompletableFuture<ExecutionGraphInfo>> requestJobFunction) {
             this.expectedJobId = Preconditions.checkNotNull(expectedJobId);
-            this.requestJobFunction = Preconditions.checkNotNull(requestJobFunction);
+            this.requestExecutionGraphInfoFunction = Preconditions.checkNotNull(requestJobFunction);
         }
 
         @Override
-        public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
+        public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+                JobID jobId, Time timeout) {
             assertThat(jobId, Matchers.equalTo(expectedJobId));
             numRequestJobCalls.incrementAndGet();
-            return super.requestJob(jobId, timeout);
+            return super.requestExecutionGraphInfo(jobId, timeout);
         }
 
         public int getNumRequestJobCalls() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
index 0985a1c..12565f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.rest.util;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import java.util.concurrent.CompletableFuture;
@@ -36,7 +36,7 @@ public enum NoOpExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return FutureUtils.completedExceptionally(
                 new UnsupportedOperationException(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 737c69b..1e33ae0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -260,7 +260,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex archivedExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -294,7 +298,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -325,6 +333,7 @@ public class DefaultSchedulerTest extends TestLogger {
         Throwable failureCause =
                 scheduler
                         .requestJob()
+                        .getArchivedExecutionGraph()
                         .getFailureInfo()
                         .getException()
                         .deserializeError(DefaultSchedulerTest.class.getClassLoader());
@@ -388,7 +397,11 @@ public class DefaultSchedulerTest extends TestLogger {
         actionsToTriggerTaskFailure.accept(vid11);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ArchivedExecutionVertex ev11 = vertexIterator.next();
         final ArchivedExecutionVertex ev12 = vertexIterator.next();
         final ArchivedExecutionVertex ev21 = vertexIterator.next();
@@ -424,7 +437,12 @@ public class DefaultSchedulerTest extends TestLogger {
         testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId);
 
         final ArchivedExecutionVertex sourceExecutionVertex =
-                scheduler.requestJob().getAllExecutionVertices().iterator().next();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator()
+                        .next();
         final ExecutionAttemptID attemptId =
                 sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -437,7 +455,9 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(
                 testExecutionVertexOperations.getDeployedVertices(),
                 containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId));
-        assertThat(scheduler.requestJob().getState(), is(equalTo(JobStatus.RUNNING)));
+        assertThat(
+                scheduler.requestJob().getArchivedExecutionGraph().getState(),
+                is(equalTo(JobStatus.RUNNING)));
     }
 
     @Test
@@ -478,7 +498,11 @@ public class DefaultSchedulerTest extends TestLogger {
         schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertex.getId()));
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -532,7 +556,11 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.handleGlobalFailure(new Exception("forced failure"));
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -554,7 +582,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ArchivedExecutionVertex v1 = vertexIterator.next();
         final ArchivedExecutionVertex v2 = vertexIterator.next();
 
@@ -588,7 +620,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -616,7 +652,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -652,7 +692,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
@@ -743,7 +787,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ExecutionAttemptID attemptId1 =
                 vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
         final ExecutionAttemptID attemptId2 =
@@ -781,7 +829,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final SchedulingTopology topology = scheduler.getSchedulingTopology();
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         final ExecutionAttemptID attemptId1 =
                 vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
         final ExecutionAttemptID attemptId2 =
@@ -818,7 +870,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
@@ -830,7 +886,8 @@ public class DefaultSchedulerTest extends TestLogger {
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        final ErrorInfo failureInfo = scheduler.requestJob().getFailureInfo();
+        final ErrorInfo failureInfo =
+                scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
         assertThat(failureInfo, is(notNullValue()));
         assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage));
     }
@@ -850,7 +907,11 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.startScheduling();
 
         Iterator<ArchivedExecutionVertex> vertexIterator =
-                scheduler.requestJob().getAllExecutionVertices().iterator();
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         ArchivedExecutionVertex v1 = vertexIterator.next();
 
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2));
@@ -863,7 +924,12 @@ public class DefaultSchedulerTest extends TestLogger {
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+        vertexIterator =
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator();
         v1 = vertexIterator.next();
         ArchivedExecutionVertex v2 = vertexIterator.next();
         assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
@@ -877,7 +943,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ExecutionAttemptID attemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
 
@@ -917,7 +987,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         // initiate restartable failure
         final ExecutionAttemptID restartableAttemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
         final RuntimeException restartableException = new RuntimeException("restartable exception");
@@ -930,7 +1004,11 @@ public class DefaultSchedulerTest extends TestLogger {
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
         final ExecutionAttemptID failingAttemptId =
-                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
         final RuntimeException failingException = new RuntimeException("failing exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 9ff306d..ad3e98f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -190,7 +190,12 @@ public class SchedulerTestingUtils {
     public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(
             DefaultScheduler scheduler) {
         return StreamSupport.stream(
-                        scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices()
+                                .spliterator(),
+                        false)
                 .map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
                 .collect(Collectors.toList());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 71a990f..b64af0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -120,7 +119,7 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public ArchivedExecutionGraph requestJob() {
+    public ExecutionGraphInfo requestJob() {
         failOperation();
         return null;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index efe8bfc..eccf446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriFunction;
 
@@ -86,6 +87,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
             String hostname,
             Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+            Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                    requestExecutionGraphInfoFunction,
             Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -115,6 +118,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
                 hostname,
                 cancelJobFunction,
                 requestJobFunction,
+                requestExecutionGraphInfoFunction,
                 requestJobResultFunction,
                 requestJobStatusFunction,
                 requestMultipleJobDetailsSupplier,
@@ -228,6 +232,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
                     hostname,
                     cancelJobFunction,
                     requestJobFunction,
+                    requestExecutionGraphInfoFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
index bd3e06a..69c6d30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
@@ -31,7 +31,7 @@ import java.util.function.IntSupplier;
 public class TestingExecutionGraphCache implements ExecutionGraphCache {
     private final IntSupplier sizeSupplier;
 
-    private final BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+    private final BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
             getExecutionGraphFunction;
 
     private final Runnable cleanupRunnable;
@@ -40,7 +40,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
 
     private TestingExecutionGraphCache(
             IntSupplier sizeSupplier,
-            BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+            BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                     getExecutionGraphFunction,
             Runnable cleanupRunnable,
             Runnable closeRunnable) {
@@ -56,7 +56,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
     }
 
     @Override
-    public CompletableFuture<AccessExecutionGraph> getExecutionGraph(
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(
             JobID jobId, RestfulGateway restfulGateway) {
         return getExecutionGraphFunction.apply(jobId, restfulGateway);
     }
@@ -79,7 +79,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
     public static final class Builder {
 
         private IntSupplier sizeSupplier = () -> 0;
-        private BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+        private BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                 getExecutionGraphFunction =
                         (ignoredA, ignoredB) ->
                                 FutureUtils.completedExceptionally(
@@ -95,7 +95,7 @@ public class TestingExecutionGraphCache implements ExecutionGraphCache {
         }
 
         public Builder setGetExecutionGraphFunction(
-                BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>>
+                BiFunction<JobID, RestfulGateway, CompletableFuture<ExecutionGraphInfo>>
                         getExecutionGraphFunction) {
             this.getExecutionGraphFunction = getExecutionGraphFunction;
             return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 3d10329..f36af49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriFunction;
 
@@ -54,6 +55,10 @@ public class TestingRestfulGateway implements RestfulGateway {
             DEFAULT_REQUEST_JOB_FUNCTION =
                     jobId ->
                             FutureUtils.completedExceptionally(new UnsupportedOperationException());
+    static final Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+            DEFAULT_REQUEST_EXECUTION_GRAPH_INFO =
+                    jobId ->
+                            FutureUtils.completedExceptionally(new UnsupportedOperationException());
     static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION =
             jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
     static final Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -106,6 +111,9 @@ public class TestingRestfulGateway implements RestfulGateway {
 
     protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
 
+    protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+            requestExecutionGraphInfoFunction;
+
     protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
 
     protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
@@ -137,6 +145,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                 LOCALHOST,
                 DEFAULT_CANCEL_JOB_FUNCTION,
                 DEFAULT_REQUEST_JOB_FUNCTION,
+                DEFAULT_REQUEST_EXECUTION_GRAPH_INFO,
                 DEFAULT_REQUEST_JOB_RESULT_FUNCTION,
                 DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
                 DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
@@ -154,6 +163,8 @@ public class TestingRestfulGateway implements RestfulGateway {
             String hostname,
             Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction,
+            Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                    requestExecutionGraphInfoFunction,
             Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
@@ -175,6 +186,7 @@ public class TestingRestfulGateway implements RestfulGateway {
         this.hostname = hostname;
         this.cancelJobFunction = cancelJobFunction;
         this.requestJobFunction = requestJobFunction;
+        this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
         this.requestJobResultFunction = requestJobResultFunction;
         this.requestJobStatusFunction = requestJobStatusFunction;
         this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier;
@@ -206,6 +218,12 @@ public class TestingRestfulGateway implements RestfulGateway {
     }
 
     @Override
+    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, Time timeout) {
+        return requestExecutionGraphInfoFunction.apply(jobId);
+    }
+
+    @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
         return requestJobResultFunction.apply(jobId);
     }
@@ -278,6 +296,8 @@ public class TestingRestfulGateway implements RestfulGateway {
         protected String hostname = LOCALHOST;
         protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
         protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction;
+        protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                requestExecutionGraphInfoFunction;
         protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
         protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
         protected Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -301,6 +321,7 @@ public class TestingRestfulGateway implements RestfulGateway {
         protected AbstractBuilder() {
             cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
             requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
+            requestExecutionGraphInfoFunction = DEFAULT_REQUEST_EXECUTION_GRAPH_INFO;
             requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
             requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
             requestMultipleJobDetailsSupplier = DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
@@ -338,6 +359,13 @@ public class TestingRestfulGateway implements RestfulGateway {
             return self();
         }
 
+        public T setRequestExecutionGraphInfoFunction(
+                Function<JobID, CompletableFuture<ExecutionGraphInfo>>
+                        requestExecutionGraphInfoFunction) {
+            this.requestExecutionGraphInfoFunction = requestExecutionGraphInfoFunction;
+            return self();
+        }
+
         public T setRequestJobResultFunction(
                 Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction) {
             this.requestJobResultFunction = requestJobResultFunction;
@@ -429,6 +457,7 @@ public class TestingRestfulGateway implements RestfulGateway {
                     hostname,
                     cancelJobFunction,
                     requestJobFunction,
+                    requestExecutionGraphInfoFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 942987e..6a8487e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
 import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -159,7 +159,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
                             blobServerResource.getBlobServer(),
                             new HeartbeatServices(100L, 1000L),
                             NoOpMetricRegistry.INSTANCE,
-                            new MemoryArchivedExecutionGraphStore(),
+                            new MemoryExecutionGraphInfoStore(),
                             VoidMetricQueryServiceRetriever.INSTANCE,
                             fatalErrorHandler);
 


[flink] 02/03: [hotfix][test] Removes unused inner class

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 278dababa04883801babcada76d715baaf3b29cf
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 19 08:56:17 2021 +0100

    [hotfix][test] Removes unused inner class
---
 .../legacy/DefaultExecutionGraphCacheTest.java     | 44 ----------------------
 1 file changed, 44 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
index 65593bb..ace82f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
@@ -18,16 +18,11 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -44,7 +39,6 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -298,42 +292,4 @@ public class DefaultExecutionGraphCacheTest extends TestLogger {
             return numRequestJobCalls.get();
         }
     }
-
-    private static final class SuspendableAccessExecutionGraph extends ArchivedExecutionGraph {
-
-        private static final long serialVersionUID = -6796543726305778101L;
-
-        private JobStatus jobStatus;
-
-        public SuspendableAccessExecutionGraph(JobID jobId) {
-            super(
-                    jobId,
-                    "DefaultExecutionGraphCacheTest",
-                    Collections.emptyMap(),
-                    Collections.emptyList(),
-                    new long[0],
-                    JobStatus.RUNNING,
-                    new ErrorInfo(new FlinkException("Test"), 42L),
-                    "",
-                    new StringifiedAccumulatorResult[0],
-                    Collections.emptyMap(),
-                    new ArchivedExecutionConfig(new ExecutionConfig()),
-                    false,
-                    null,
-                    null,
-                    "stateBackendName",
-                    "checkpointStorageName");
-
-            jobStatus = super.getState();
-        }
-
-        @Override
-        public JobStatus getState() {
-            return jobStatus;
-        }
-
-        public void setJobStatus(JobStatus jobStatus) {
-            this.jobStatus = jobStatus;
-        }
-    }
 }


[flink] 01/03: [FLINK-11678][runtime] Removes unnecessary obsolete method overwriting

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit deddfa256af2f1a879eb8bea19a9f8645b96a4e3
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Feb 4 10:31:05 2021 +0100

    [FLINK-11678][runtime] Removes unnecessary obsolete method overwriting
    
    RestfulGateway provides the exact same method signature.
    DispatcherGateway.requestJob became obsolete with the changes made in ce49a904.
---
 .../flink/runtime/dispatcher/DispatcherGateway.java     | 17 -----------------
 1 file changed, 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index eb7d9e3..31a42fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -21,10 +21,8 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -60,21 +58,6 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
      */
     CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
 
-    /**
-     * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph,
-     * then the future is completed with a {@link FlinkJobNotFoundException}.
-     *
-     * <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike
-     * the super interface.
-     *
-     * @param jobId identifying the job whose AccessExecutionGraph is requested
-     * @param timeout for the asynchronous operation
-     * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link
-     *     FlinkJobNotFoundException}
-     */
-    @Override
-    CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
-
     default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
         return shutDownCluster();
     }