You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:14 UTC
[07/12] flink git commit: [FLINK-9194][history] Add archiving routine
to Dispatcher
[FLINK-9194][history] Add archiving routine to Dispatcher
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd374b83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd374b83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd374b83
Branch: refs/heads/master
Commit: fd374b832f830adfa59b7f834b11c38080486f1c
Parents: 6b6603f
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 43 +++++++++++++++++++-
.../runtime/dispatcher/MiniDispatcher.java | 5 ++-
.../dispatcher/StandaloneDispatcher.java | 7 +++-
.../runtime/entrypoint/ClusterEntrypoint.java | 7 +++-
.../entrypoint/JobClusterEntrypoint.java | 5 ++-
.../entrypoint/SessionClusterEntrypoint.java | 7 +++-
.../flink/runtime/history/FsJobArchivist.java | 42 +++++++++++++++++++
.../flink/runtime/minicluster/MiniCluster.java | 3 +-
.../runtime/webmonitor/WebMonitorEndpoint.java | 22 +++++++++-
.../runtime/dispatcher/DispatcherTest.java | 1 +
.../runtime/dispatcher/MiniDispatcherTest.java | 1 +
11 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 58ffda3..82b9291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -124,6 +129,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Nullable
protected final String restAddress;
+ @Nullable
+ private final JsonArchivist jsonArchivist;
+
+ @Nullable
+ private final Path archivePath;
+
private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
public Dispatcher(
@@ -140,7 +151,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
- @Nullable String restAddress) throws Exception {
+ @Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist) throws Exception {
super(rpcService, endpointId);
this.configuration = Preconditions.checkNotNull(configuration);
@@ -165,6 +177,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
this.restAddress = restAddress;
+ this.jsonArchivist = jsonArchivist;
+
+ String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+ if (configuredArchivePath != null) {
+ Path tmpArchivePath = null;
+ try {
+ tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+ } catch (Exception e) {
+ log.warn("Failed to validate specified archive directory in '{}'. " +
+ "Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
+ }
+ archivePath = tmpArchivePath;
+ } else {
+ archivePath = null;
+ }
+
this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
@@ -621,6 +649,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
e);
}
+ try {
+ if (jsonArchivist != null && archivePath != null) {
+ FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
+ log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
+ }
+ } catch (IOException e) {
+ log.info(
+ "Could not archive completed job {}({}).",
+ archivedExecutionGraph.getJobName(),
+ archivedExecutionGraph.getJobID(),
+ e);
+ }
+
final JobID jobId = archivedExecutionGraph.getJobID();
removeJob(jobId, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 4361e08..38e74fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher {
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist,
JobGraph jobGraph,
JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
super(
@@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher {
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
- restAddress);
+ restAddress,
+ jsonArchivist);
this.executionMode = checkNotNull(executionMode);
this.jobTerminationFuture = new CompletableFuture<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 52ac7a0..5c6a7ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import javax.annotation.Nullable;
@@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher {
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
- @Nullable String restAddress) throws Exception {
+ @Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist) throws Exception {
super(
rpcService,
endpointId,
@@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher {
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
- restAddress);
+ restAddress,
+ jsonArchivist);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f823ea7..933add8 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore,
this,
- webMonitorEndpoint.getRestBaseUrl());
+ webMonitorEndpoint.getRestBaseUrl(),
+ webMonitorEndpoint);
LOG.debug("Starting ResourceManager.");
resourceManager.start();
@@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
- @Nullable String restAddress) throws Exception;
+ @Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist) throws Exception;
protected abstract ResourceManager<?> createResourceManager(
Configuration configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ea7cbe2..2a5b8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.FlinkException;
@@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
- @Nullable String restAddress) throws Exception {
+ @Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist) throws Exception {
final JobGraph jobGraph = retrieveJobGraph(configuration);
@@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
restAddress,
+ jsonArchivist,
jobGraph,
executionMode);
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fcab796..85446eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
- @Nullable String restAddress) throws Exception {
+ @Nullable String restAddress,
+ @Nullable JsonArchivist jsonArchivist) throws Exception {
// create the default dispatcher
return new StandaloneDispatcher(
@@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
archivedExecutionGraphStore,
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
- restAddress);
+ restAddress,
+ jsonArchivist);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 3a6ea4f..1cfbf96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.history;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +68,9 @@ public class FsJobArchivist {
* @param graph graph to archive
* @return path to where the archive was written, or null if no archive was created
* @throws IOException
+ * @deprecated only kept for legacy reasons
*/
+ @Deprecated
public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
@@ -100,6 +103,45 @@ public class FsJobArchivist {
}
/**
+ * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by
+ * {@link JobManagerOptions#ARCHIVE_DIR}.
+ *
+ * @param rootPath directory to which the archive should be written to
+ * @param jobId job id
+ * @param jsonToArchive collection of json-path pairs to that should be archived
+ * @return path to where the archive was written, or null if no archive was created
+ * @throws IOException
+ */
+ public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {
+ try {
+ FileSystem fs = rootPath.getFileSystem();
+ Path path = new Path(rootPath, jobId.toString());
+ OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
+
+ try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+ gen.writeStartObject();
+ gen.writeArrayFieldStart(ARCHIVE);
+ for (ArchivedJson archive : jsonToArchive) {
+ gen.writeStartObject();
+ gen.writeStringField(PATH, archive.getPath());
+ gen.writeStringField(JSON, archive.getJson());
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ gen.writeEndObject();
+ } catch (Exception e) {
+ fs.delete(path, false);
+ throw e;
+ }
+ LOG.info("Job {} has been archived at {}.", jobId, path);
+ return path;
+ } catch (IOException e) {
+ LOG.error("Failed to archive job.", e);
+ throw e;
+ }
+ }
+
+ /**
* Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}.
*
* @param file archive to extract
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a86aa2e..09f8bf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
new MemoryArchivedExecutionGraphStore(),
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
- dispatcherRestEndpoint.getRestBaseUrl());
+ dispatcherRestEndpoint.getRestBaseUrl(),
+ dispatcherRestEndpoint);
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5cb57d3..f9b2fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,8 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.ExceptionUtils;
@@ -126,6 +129,7 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -137,7 +141,7 @@ import java.util.concurrent.Executor;
*
* @param <T> type of the leader gateway
*/
-public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {
protected final GatewayRetriever<? extends T> leaderRetriever;
protected final Configuration clusterConfiguration;
@@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
private boolean hasWebUI = false;
+ private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);
+
public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<? extends T> leaderRetriever,
@@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
+ handlers.stream()
+ .map(tuple -> tuple.f1)
+ .filter(handler -> handler instanceof JsonArchivist)
+ .forEachOrdered(handler -> archivingHandlers.add((JsonArchivist) handler));
+
return handlers;
}
@@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
fatalErrorHandler.onFatalError(exception);
}
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size());
+ for (JsonArchivist archivist : archivingHandlers) {
+ Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph);
+ archivedJson.addAll(subArchive);
+ }
+ return archivedJson;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 18a8ec1..d9482e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger {
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
+ null,
null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 6dfb243..157fca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
testingJobManagerRunnerFactory,
testingFatalErrorHandler,
null,
+ null,
jobGraph,
executionMode);
}