You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:14 UTC

[07/12] flink git commit: [FLINK-9194][history] Add archiving routine to Dispatcher

[FLINK-9194][history] Add archiving routine to Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd374b83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd374b83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd374b83

Branch: refs/heads/master
Commit: fd374b832f830adfa59b7f834b11c38080486f1c
Parents: 6b6603f
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 43 +++++++++++++++++++-
 .../runtime/dispatcher/MiniDispatcher.java      |  5 ++-
 .../dispatcher/StandaloneDispatcher.java        |  7 +++-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  7 +++-
 .../entrypoint/JobClusterEntrypoint.java        |  5 ++-
 .../entrypoint/SessionClusterEntrypoint.java    |  7 +++-
 .../flink/runtime/history/FsJobArchivist.java   | 42 +++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 22 +++++++++-
 .../runtime/dispatcher/DispatcherTest.java      |  1 +
 .../runtime/dispatcher/MiniDispatcherTest.java  |  1 +
 11 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 58ffda3..82b9291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -124,6 +129,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	@Nullable
 	protected final String restAddress;
 
+	@Nullable
+	private final JsonArchivist jsonArchivist;
+
+	@Nullable
+	private final Path archivePath;
+
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -140,7 +151,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -165,6 +177,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
+		this.jsonArchivist = jsonArchivist;
+
+		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+		if (configuredArchivePath != null) {
+			Path tmpArchivePath = null;
+			try {
+				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+			} catch (Exception e) {
+				log.warn("Failed to validate specified archive directory in '{}'. " +
+					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
+			}
+			archivePath = tmpArchivePath;
+		} else {
+			archivePath = null;
+		}
+
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
 		this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
@@ -621,6 +649,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
+		try {
+			if (jsonArchivist != null && archivePath != null) {
+				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
+				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
+			}
+		} catch (IOException e) {
+			log.info(
+				"Could not archive completed job {}({}).",
+				archivedExecutionGraph.getJobName(),
+				archivedExecutionGraph.getJobID(),
+				e);
+		}
+
 		final JobID jobId = archivedExecutionGraph.getJobID();
 
 		removeJob(jobId, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 4361e08..38e74fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 52ac7a0..5c6a7ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f823ea7..933add8 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				metricRegistry.getMetricQueryServicePath(),
 				archivedExecutionGraphStore,
 				this,
-				webMonitorEndpoint.getRestBaseUrl());
+				webMonitorEndpoint.getRestBaseUrl(),
+				webMonitorEndpoint);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		@Nullable String metricQueryServicePath,
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress) throws Exception;
+		@Nullable String restAddress,
+		@Nullable JsonArchivist jsonArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ea7cbe2..2a5b8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
+			jsonArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fcab796..85446eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			archivedExecutionGraphStore,
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 3a6ea4f..1cfbf96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +68,9 @@ public class FsJobArchivist {
 	 * @param graph  graph to archive
 	 * @return path to where the archive was written, or null if no archive was created
 	 * @throws IOException
+	 * @deprecated only kept for legacy reasons
 	 */
+	@Deprecated
 	public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException {
 		try {
 			FileSystem fs = rootPath.getFileSystem();
@@ -100,6 +103,45 @@ public class FsJobArchivist {
 	}
 
 	/**
+	 * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by
+	 * {@link JobManagerOptions#ARCHIVE_DIR}.
+	 *
+	 * @param rootPath directory to which the archive should be written to
+	 * @param jobId  job id
+	 * @param jsonToArchive collection of json-path pairs to that should be archived
+	 * @return path to where the archive was written, or null if no archive was created
+	 * @throws IOException
+	 */
+	public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {
+		try {
+			FileSystem fs = rootPath.getFileSystem();
+			Path path = new Path(rootPath, jobId.toString());
+			OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
+
+			try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart(ARCHIVE);
+				for (ArchivedJson archive : jsonToArchive) {
+					gen.writeStartObject();
+					gen.writeStringField(PATH, archive.getPath());
+					gen.writeStringField(JSON, archive.getJson());
+					gen.writeEndObject();
+				}
+				gen.writeEndArray();
+				gen.writeEndObject();
+			} catch (Exception e) {
+				fs.delete(path, false);
+				throw e;
+			}
+			LOG.info("Job {} has been archived at {}.", jobId, path);
+			return path;
+		} catch (IOException e) {
+			LOG.error("Failed to archive job.", e);
+			throw e;
+		}
+	}
+
+	/**
 	 * Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}.
 	 *
 	 * @param file archive to extract

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a86aa2e..09f8bf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					new MemoryArchivedExecutionGraphStore(),
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
-					dispatcherRestEndpoint.getRestBaseUrl());
+					dispatcherRestEndpoint.getRestBaseUrl(),
+					dispatcherRestEndpoint);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5cb57d3..f9b2fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,8 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -126,6 +129,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -137,7 +141,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> type of the leader gateway
  */
-public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {
 
 	protected final GatewayRetriever<? extends T> leaderRetriever;
 	protected final Configuration clusterConfiguration;
@@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	private boolean hasWebUI = false;
 
+	private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);
+
 	public WebMonitorEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<? extends T> leaderRetriever,
@@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
 		handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
 
+		handlers.stream()
+			.map(tuple -> tuple.f1)
+			.filter(handler -> handler instanceof JsonArchivist)
+			.forEachOrdered(handler -> archivingHandlers.add((JsonArchivist) handler));
+
 		return handlers;
 	}
 
@@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		fatalErrorHandler.onFatalError(exception);
 	}
 
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size());
+		for (JsonArchivist archivist : archivingHandlers) {
+			Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph);
+			archivedJson.addAll(subArchive);
+		}
+		return archivedJson;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 18a8ec1..d9482e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger {
 				archivedExecutionGraphStore,
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
+				null,
 				null);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 6dfb243..157fca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
+			null,
 			jobGraph,
 			executionMode);
 	}