You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:53:03 UTC

[12/12] flink git commit: [FLINK-9194] Introduce HistoryServerArchivist interface

[FLINK-9194] Introduce HistoryServerArchivist interface

The HistoryServerArchivist interface encapsulates the archiving logic of an
AccessExecutionGraph to the history server. Currently this means to generate
the JSON responses for all possible HTTP requests and writing them to a
target directory.

This closes #5902.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5bcecd7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5bcecd7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5bcecd7

Branch: refs/heads/release-1.5
Commit: d5bcecd77cfcd0777e3e7b23e60bd5baf8c1e273
Parents: 27e8901
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 19:58:20 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 65 +++++++-------------
 .../dispatcher/HistoryServerArchivist.java      | 55 +++++++++++++++++
 .../JsonResponseHistoryServerArchivist.java     | 56 +++++++++++++++++
 .../runtime/dispatcher/MiniDispatcher.java      |  5 +-
 .../dispatcher/StandaloneDispatcher.java        |  5 +-
 .../dispatcher/VoidHistoryServerArchivist.java  | 36 +++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java   |  8 ++-
 .../entrypoint/JobClusterEntrypoint.java        |  6 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |  2 +-
 12 files changed, 191 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 82b9291..4d30870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -34,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,8 +61,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -123,18 +118,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final JobManagerMetricGroup jobManagerMetricGroup;
 
+	private final HistoryServerArchivist historyServerArchivist;
+
 	@Nullable
 	private final String metricQueryServicePath;
 
 	@Nullable
 	protected final String restAddress;
 
-	@Nullable
-	private final JsonArchivist jsonArchivist;
-
-	@Nullable
-	private final Path archivePath;
-
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -152,7 +143,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -177,21 +168,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
-		this.jsonArchivist = jsonArchivist;
-
-		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
-		if (configuredArchivePath != null) {
-			Path tmpArchivePath = null;
-			try {
-				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
-			} catch (Exception e) {
-				log.warn("Failed to validate specified archive directory in '{}'. " +
-					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
-			}
-			archivePath = tmpArchivePath;
-		} else {
-			archivePath = null;
-		}
+		this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist);
 
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
@@ -639,6 +616,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
 
+		archiveExecutionGraph(archivedExecutionGraph);
+
+		final JobID jobId = archivedExecutionGraph.getJobID();
+
+		removeJob(jobId, true);
+	}
+
+	private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
 		try {
 			archivedExecutionGraphStore.put(archivedExecutionGraph);
 		} catch (IOException e) {
@@ -649,22 +634,18 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
-		try {
-			if (jsonArchivist != null && archivePath != null) {
-				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
-				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
-			}
-		} catch (IOException e) {
-			log.info(
-				"Could not archive completed job {}({}).",
-				archivedExecutionGraph.getJobName(),
-				archivedExecutionGraph.getJobID(),
-				e);
-		}
-
-		final JobID jobId = archivedExecutionGraph.getJobID();
+		final CompletableFuture<Acknowledge> executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
 
-		removeJob(jobId, true);
+		executionGraphFuture.whenComplete(
+			(Acknowledge ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					log.info(
+						"Could not archive completed job {}({}) to the history server.",
+						archivedExecutionGraph.getJobName(),
+						archivedExecutionGraph.getJobID(),
+						throwable);
+				}
+			});
 	}
 
 	protected void jobNotFinished(JobID jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
new file mode 100644
index 0000000..0030159
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Writer for an {@link AccessExecutionGraph}.
+ */
+public interface HistoryServerArchivist {
+
+	/**
+	 * Archives the given {@link AccessExecutionGraph} on the history server.
+	 *
+	 * @param executionGraph to store on the history server
+	 * @return Future which is completed once the archiving has been completed.
+	 */
+	CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+
+	static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
+		final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+
+		if (configuredArchivePath != null) {
+			final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+
+			return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
+		} else {
+			return VoidHistoryServerArchivist.INSTANCE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
new file mode 100644
index 0000000..be58399
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation which archives an {@link AccessExecutionGraph} such that it stores
+ * the JSON requests for all possible history server requests.
+ */
+class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
+
+	private final JsonArchivist jsonArchivist;
+
+	private final Path archivePath;
+
+	JsonResponseHistoryServerArchivist(JsonArchivist jsonArchivist, Path archivePath) {
+		this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist);
+		this.archivePath = Preconditions.checkNotNull(archivePath);
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+		try {
+			FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 38e74fb..8b497b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -72,7 +71,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist,
+			HistoryServerArchivist historyServerArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -90,7 +89,7 @@ public class MiniDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5c6a7ab..c39615c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -52,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -68,6 +67,6 @@ public class StandaloneDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
new file mode 100644
index 0000000..2d34d83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * No-op implementation of the {@link HistoryServerArchivist}.
+ */
+public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
+	INSTANCE {
+		@Override
+		public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 933add8..a267abb 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -347,6 +347,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
 
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
 			dispatcher = createDispatcher(
 				configuration,
 				rpcService,
@@ -359,7 +361,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				archivedExecutionGraphStore,
 				this,
 				webMonitorEndpoint.getRestBaseUrl(),
-				webMonitorEndpoint);
+				historyServerArchivist);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -660,7 +662,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
 		@Nullable String restAddress,
-		@Nullable JsonArchivist jsonArchivist) throws Exception;
+		HistoryServerArchivist historyServerArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2a5b8ea..80a9da2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -102,7 +102,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -124,7 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist,
+			historyServerArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 85446eb..40eb8b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -116,7 +116,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -133,6 +133,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 04d5fb8..b2a1570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -357,6 +358,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
 
+				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+
 				dispatcher = new StandaloneDispatcher(
 					jobManagerRpcService,
 					Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
@@ -371,7 +374,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
 					dispatcherRestEndpoint.getRestBaseUrl(),
-					dispatcherRestEndpoint);
+					historyServerArchivist);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d9482e7..eff63e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -568,7 +568,7 @@ public class DispatcherTest extends TestLogger {
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
 				null,
-				null);
+				VoidHistoryServerArchivist.INSTANCE);
 		}
 
 		@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 157fca7..914531d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
-			null,
+			VoidHistoryServerArchivist.INSTANCE,
 			jobGraph,
 			executionMode);
 	}