You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:52 UTC

[01/12] flink git commit: [hotfix] Make field JobMaster#resourceManagerLeaderRetriever final

Repository: flink
Updated Branches:
  refs/heads/release-1.5 2d34b9030 -> 64117e8e6


[hotfix] Make field JobMaster#resourceManagerLeaderRetriever final


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64117e8e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64117e8e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64117e8e

Branch: refs/heads/release-1.5
Commit: 64117e8e64c981590e9109e47eb4ee5b266f6a8d
Parents: ee41e95
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 17:30:55 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64117e8e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aff3280..f1dbbb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -189,7 +189,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// --------- ResourceManager --------
 
-	private LeaderRetrievalService resourceManagerLeaderRetriever;
+	private final LeaderRetrievalService resourceManagerLeaderRetriever;
 
 	// --------- TaskManagers --------
 


[12/12] flink git commit: [FLINK-9194] Introduce HistoryServerArchivist interface

Posted by tr...@apache.org.
[FLINK-9194] Introduce HistoryServerArchivist interface

The HistoryServerArchivist interface encapsulates the archiving logic of an
AccessExecutionGraph to the history server. Currently this means to generate
the JSON responses for all possible HTTP requests and writing them to a
target directory.

This closes #5902.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5bcecd7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5bcecd7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5bcecd7

Branch: refs/heads/release-1.5
Commit: d5bcecd77cfcd0777e3e7b23e60bd5baf8c1e273
Parents: 27e8901
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 19:58:20 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 65 +++++++-------------
 .../dispatcher/HistoryServerArchivist.java      | 55 +++++++++++++++++
 .../JsonResponseHistoryServerArchivist.java     | 56 +++++++++++++++++
 .../runtime/dispatcher/MiniDispatcher.java      |  5 +-
 .../dispatcher/StandaloneDispatcher.java        |  5 +-
 .../dispatcher/VoidHistoryServerArchivist.java  | 36 +++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java   |  8 ++-
 .../entrypoint/JobClusterEntrypoint.java        |  6 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |  2 +-
 12 files changed, 191 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 82b9291..4d30870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -34,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,8 +61,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -123,18 +118,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final JobManagerMetricGroup jobManagerMetricGroup;
 
+	private final HistoryServerArchivist historyServerArchivist;
+
 	@Nullable
 	private final String metricQueryServicePath;
 
 	@Nullable
 	protected final String restAddress;
 
-	@Nullable
-	private final JsonArchivist jsonArchivist;
-
-	@Nullable
-	private final Path archivePath;
-
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -152,7 +143,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -177,21 +168,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
-		this.jsonArchivist = jsonArchivist;
-
-		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
-		if (configuredArchivePath != null) {
-			Path tmpArchivePath = null;
-			try {
-				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
-			} catch (Exception e) {
-				log.warn("Failed to validate specified archive directory in '{}'. " +
-					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
-			}
-			archivePath = tmpArchivePath;
-		} else {
-			archivePath = null;
-		}
+		this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist);
 
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
@@ -639,6 +616,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
 
+		archiveExecutionGraph(archivedExecutionGraph);
+
+		final JobID jobId = archivedExecutionGraph.getJobID();
+
+		removeJob(jobId, true);
+	}
+
+	private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
 		try {
 			archivedExecutionGraphStore.put(archivedExecutionGraph);
 		} catch (IOException e) {
@@ -649,22 +634,18 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
-		try {
-			if (jsonArchivist != null && archivePath != null) {
-				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
-				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
-			}
-		} catch (IOException e) {
-			log.info(
-				"Could not archive completed job {}({}).",
-				archivedExecutionGraph.getJobName(),
-				archivedExecutionGraph.getJobID(),
-				e);
-		}
-
-		final JobID jobId = archivedExecutionGraph.getJobID();
+		final CompletableFuture<Acknowledge> executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
 
-		removeJob(jobId, true);
+		executionGraphFuture.whenComplete(
+			(Acknowledge ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					log.info(
+						"Could not archive completed job {}({}) to the history server.",
+						archivedExecutionGraph.getJobName(),
+						archivedExecutionGraph.getJobID(),
+						throwable);
+				}
+			});
 	}
 
 	protected void jobNotFinished(JobID jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
new file mode 100644
index 0000000..0030159
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Writer for an {@link AccessExecutionGraph}.
+ */
+public interface HistoryServerArchivist {
+
+	/**
+	 * Archives the given {@link AccessExecutionGraph} on the history server.
+	 *
+	 * @param executionGraph to store on the history server
+	 * @return Future which is completed once the archiving has been completed.
+	 */
+	CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+
+	static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
+		final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+
+		if (configuredArchivePath != null) {
+			final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+
+			return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
+		} else {
+			return VoidHistoryServerArchivist.INSTANCE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
new file mode 100644
index 0000000..be58399
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation which archives an {@link AccessExecutionGraph} such that it stores
+ * the JSON requests for all possible history server requests.
+ */
+class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
+
+	private final JsonArchivist jsonArchivist;
+
+	private final Path archivePath;
+
+	JsonResponseHistoryServerArchivist(JsonArchivist jsonArchivist, Path archivePath) {
+		this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist);
+		this.archivePath = Preconditions.checkNotNull(archivePath);
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+		try {
+			FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 38e74fb..8b497b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -72,7 +71,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist,
+			HistoryServerArchivist historyServerArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -90,7 +89,7 @@ public class MiniDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5c6a7ab..c39615c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -52,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -68,6 +67,6 @@ public class StandaloneDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
new file mode 100644
index 0000000..2d34d83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * No-op implementation of the {@link HistoryServerArchivist}.
+ */
+public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
+	INSTANCE {
+		@Override
+		public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 933add8..a267abb 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -347,6 +347,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
 
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
 			dispatcher = createDispatcher(
 				configuration,
 				rpcService,
@@ -359,7 +361,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				archivedExecutionGraphStore,
 				this,
 				webMonitorEndpoint.getRestBaseUrl(),
-				webMonitorEndpoint);
+				historyServerArchivist);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -660,7 +662,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
 		@Nullable String restAddress,
-		@Nullable JsonArchivist jsonArchivist) throws Exception;
+		HistoryServerArchivist historyServerArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2a5b8ea..80a9da2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -102,7 +102,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -124,7 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist,
+			historyServerArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 85446eb..40eb8b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -116,7 +116,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -133,6 +133,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 04d5fb8..b2a1570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -357,6 +358,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
 
+				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+
 				dispatcher = new StandaloneDispatcher(
 					jobManagerRpcService,
 					Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
@@ -371,7 +374,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
 					dispatcherRestEndpoint.getRestBaseUrl(),
-					dispatcherRestEndpoint);
+					historyServerArchivist);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d9482e7..eff63e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -568,7 +568,7 @@ public class DispatcherTest extends TestLogger {
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
 				null,
-				null);
+				VoidHistoryServerArchivist.INSTANCE);
 		}
 
 		@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/d5bcecd7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 157fca7..914531d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
-			null,
+			VoidHistoryServerArchivist.INSTANCE,
 			jobGraph,
 			executionMode);
 	}


[04/12] flink git commit: [FLINK-9246][HS] Adjust HistoryServer for job overview changes

Posted by tr...@apache.org.
[FLINK-9246][HS] Adjust HistoryServer for job overview changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca51c410
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca51c410
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca51c410

Branch: refs/heads/release-1.5
Commit: ca51c410638619739193c61888001c5b14033cea
Parents: 2d34b90
Author: hzyuqi1 <hz...@corp.netease.com>
Authored: Tue Apr 24 10:25:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java        | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca51c410/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 450436f..413473b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FileUtils;
 
@@ -162,7 +163,7 @@ class HistoryServerArchiveFetcher {
 									String json = archive.getJson();
 
 									File target;
-									if (path.equals("/joboverview")) {
+									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
 										target = new File(webDir, path + JSON_FILE_ENDING);
@@ -211,7 +212,7 @@ class HistoryServerArchiveFetcher {
 						}
 					}
 					if (updateOverview) {
-						updateJobOverview(webDir);
+						updateJobOverview(webOverviewDir, webDir);
 					}
 				}
 			} catch (Exception e) {
@@ -230,19 +231,16 @@ class HistoryServerArchiveFetcher {
 	 *
 	 * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
 	 */
-	private static void updateJobOverview(File webDir) {
-		File webOverviewDir = new File(webDir, "overviews");
-		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, "joboverview"))) {
+	private static void updateJobOverview(File webOverviewDir, File webDir) {
+		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
 			gen.writeStartObject();
-			gen.writeArrayFieldStart("running");
-			gen.writeEndArray();
-			gen.writeArrayFieldStart("finished");
+			gen.writeArrayFieldStart("jobs");
 
 			File[] overviews = new File(webOverviewDir.getPath()).listFiles();
 			if (overviews != null) {
 				for (File overview : overviews) {
 					JsonNode root = mapper.readTree(overview);
-					JsonNode finished = root.get("finished");
+					JsonNode finished = root.get("jobs");
 					JsonNode job = finished.get(0);
 					mapper.writeTree(gen, job);
 				}


[10/12] flink git commit: [hotfix] Add import for linked component in ArchivedJson

Posted by tr...@apache.org.
[hotfix] Add import for linked component in ArchivedJson


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27e89019
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27e89019
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27e89019

Branch: refs/heads/release-1.5
Commit: 27e8901930d2293f296cc9305bdbc3a97ee6fa31
Parents: 8d8b3ec
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 19:14:06 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/webmonitor/history/ArchivedJson.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27e89019/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index 9200248..a9ddce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.Preconditions;


[08/12] flink git commit: [FLINK-9194] Add support for legacy history server formats

Posted by tr...@apache.org.
[FLINK-9194] Add support for legacy history server formats


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8b3ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8b3ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8b3ec8

Branch: refs/heads/release-1.5
Commit: 8d8b3ec8881bbe3838c4596fb50988a0142e6bbb
Parents: 7ba41bc
Author: zentol <ch...@apache.org>
Authored: Tue May 8 13:13:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java    | 49 +++++++++++
 .../webmonitor/history/HistoryServerTest.java   | 88 +++++++++++++++++++-
 2 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d8b3ec8/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index ac19197..5acbe54 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
@@ -33,6 +35,7 @@ import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.slf4j.Logger;
@@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -168,6 +173,9 @@ class HistoryServerArchiveFetcher {
 									File target;
 									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+									} else if (path.equals("/joboverview")) { // legacy path
+										json = convertLegacyJobOverview(json);
+										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
 										target = new File(webDir, path + JSON_FILE_ENDING);
 									}
@@ -225,6 +233,47 @@ class HistoryServerArchiveFetcher {
 		}
 	}
 
+	private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
+		JsonNode root = mapper.readTree(legacyOverview);
+		JsonNode finishedJobs = root.get("finished");
+		JsonNode job = finishedJobs.get(0);
+
+		JobID jobId = JobID.fromHexString(job.get("jid").asText());
+		String name = job.get("name").asText();
+		JobStatus state = JobStatus.valueOf(job.get("state").asText());
+
+		long startTime = job.get("start-time").asLong();
+		long endTime = job.get("end-time").asLong();
+		long duration = job.get("duration").asLong();
+		long lastMod = job.get("last-modification").asLong();
+
+		JsonNode tasks = job.get("tasks");
+		int numTasks = tasks.get("total").asInt();
+		int pending = tasks.get("pending").asInt();
+		int running = tasks.get("running").asInt();
+		int finished = tasks.get("finished").asInt();
+		int canceling = tasks.get("canceling").asInt();
+		int canceled = tasks.get("canceled").asInt();
+		int failed = tasks.get("failed").asInt();
+
+		int[] tasksPerState = new int[ExecutionState.values().length];
+		// pending is a mix of CREATED/SCHEDULED/DEPLOYING
+		// to maintain the correct number of task states we have to pick one of them
+		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+		tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
+		tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
+		tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
+		tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
+		tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
+
+		JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
+		MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));
+
+		StringWriter sw = new StringWriter();
+		mapper.writeValue(sw, multipleJobsDetails);
+		return sw.toString();
+	}
+
 	/**
 	 * This method replicates the JSON response that would be given by the {@link JobsOverviewHandler} when
 	 * listing both running and finished jobs.

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8b3ec8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 580d80f..9407af2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.commons.io.IOUtils;
@@ -38,12 +42,18 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY;
+
 /**
  * Tests for the HistoryServer.
  */
@@ -88,6 +98,7 @@ public class HistoryServerTest extends TestLogger {
 		for (int x = 0; x < numJobs; x++) {
 			runJob();
 		}
+		createLegacyArchive(jmDirectory.toPath());
 
 		CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
@@ -99,7 +110,7 @@ public class HistoryServerTest extends TestLogger {
 
 		// the job is archived asynchronously after env.execute() returns
 		File[] archives = jmDirectory.listFiles();
-		while (archives == null || archives.length != numJobs) {
+		while (archives == null || archives.length != numJobs + 1) {
 			Thread.sleep(50);
 			archives = jmDirectory.listFiles();
 		}
@@ -114,7 +125,7 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			Assert.assertEquals(numJobs, overview.getJobs().size());
+			Assert.assertEquals(numJobs + 1, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}
@@ -143,4 +154,77 @@ public class HistoryServerTest extends TestLogger {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
+
+	private static void createLegacyArchive(Path directory) throws IOException {
+		JobID jobID = JobID.generate();
+
+		StringWriter sw = new StringWriter();
+		try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
+			try (JsonObject root = new JsonObject(gen)) {
+				try (JsonArray finished = new JsonArray(gen, "finished")) {
+					try (JsonObject job = new JsonObject(gen)) {
+						gen.writeStringField("jid", jobID.toString());
+						gen.writeStringField("name", "testjob");
+						gen.writeStringField("state", JobStatus.FINISHED.name());
+
+						gen.writeNumberField("start-time", 0L);
+						gen.writeNumberField("end-time", 1L);
+						gen.writeNumberField("duration", 1L);
+						gen.writeNumberField("last-modification", 1L);
+
+						try (JsonObject tasks = new JsonObject(gen, "tasks")) {
+							gen.writeNumberField("total", 0);
+
+							gen.writeNumberField("pending", 0);
+							gen.writeNumberField("running", 0);
+							gen.writeNumberField("finished", 0);
+							gen.writeNumberField("canceling", 0);
+							gen.writeNumberField("canceled", 0);
+							gen.writeNumberField("failed", 0);
+						}
+					}
+				}
+			}
+		}
+		String json = sw.toString();
+
+		ArchivedJson archivedJson = new ArchivedJson("/joboverview", json);
+
+		FsJobArchivist.archiveJob(new org.apache.flink.core.fs.Path(directory.toUri()), jobID, Collections.singleton(archivedJson));
+	}
+
+	private static final class JsonObject implements AutoCloseable {
+
+		private final JsonGenerator gen;
+
+		JsonObject(JsonGenerator gen) throws IOException {
+			this.gen = gen;
+			gen.writeStartObject();
+		}
+
+		private JsonObject(JsonGenerator gen, String name) throws IOException {
+			this.gen = gen;
+			gen.writeObjectFieldStart(name);
+		}
+
+		@Override
+		public void close() throws IOException {
+			gen.writeEndObject();
+		}
+	}
+
+	private static final class JsonArray implements AutoCloseable {
+
+		private final JsonGenerator gen;
+
+		JsonArray(JsonGenerator gen, String name) throws IOException {
+			this.gen = gen;
+			gen.writeArrayFieldStart(name);
+		}
+
+		@Override
+		public void close() throws IOException {
+			gen.writeEndArray();
+		}
+	}
 }


[11/12] flink git commit: [FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

Posted by tr...@apache.org.
[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

A NPE occurred when trying to disconnect an unestablished ResourceManager connection.
In order to fix this problem, we now check whether the connection has been established
or not.

This closes #6011.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee41e95b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee41e95b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee41e95b

Branch: refs/heads/release-1.5
Commit: ee41e95b20f8164ccec7ceda8d40baf3efae17f6
Parents: f5be783
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 14:14:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 59 ++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 83 +++++++++++---------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 48 +++++++++++
 3 files changed, 155 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee41e95b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..46c1b4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Class which contains the connection details of an established
+ * connection with the ResourceManager.
+ */
+class EstablishedResourceManagerConnection {
+
+	private final ResourceManagerGateway resourceManagerGateway;
+
+	private final ResourceManagerId resourceManagerId;
+
+	private final ResourceID resourceManagerResourceID;
+
+	EstablishedResourceManagerConnection(
+			@Nonnull ResourceManagerGateway resourceManagerGateway,
+			@Nonnull ResourceManagerId resourceManagerId,
+			@Nonnull ResourceID resourceManagerResourceID) {
+		this.resourceManagerGateway = resourceManagerGateway;
+		this.resourceManagerId = resourceManagerId;
+		this.resourceManagerResourceID = resourceManagerResourceID;
+	}
+
+	public ResourceManagerGateway getResourceManagerGateway() {
+		return resourceManagerGateway;
+	}
+
+	public ResourceManagerId getResourceManagerId() {
+		return resourceManagerId;
+	}
+
+	public ResourceID getResourceManagerResourceID() {
+		return resourceManagerResourceID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee41e95b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f30c119..aff3280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -191,9 +191,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
-	@Nullable
-	private ResourceManagerConnection resourceManagerConnection;
-
 	// --------- TaskManagers --------
 
 	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
@@ -211,6 +208,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	@Nullable
 	private String lastInternalSavepoint;
 
+	@Nullable
+	private ResourceManagerConnection resourceManagerConnection;
+
+	@Nullable
+	private EstablishedResourceManagerConnection establishedResourceManagerConnection;
+
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
@@ -290,6 +293,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
 		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 		this.jobStatusListener = null;
+
+		this.resourceManagerConnection = null;
+		this.establishedResourceManagerConnection = null;
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -881,12 +887,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final ResourceManagerId resourceManagerId,
 			final Exception cause) {
 
-		if (resourceManagerConnection != null
-				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
+		if (isConnectingToResourceManager(resourceManagerId)) {
 			closeResourceManagerConnection(cause);
 		}
 	}
 
+	private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
+		return resourceManagerConnection != null
+				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+	}
+
 	@Override
 	public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
 		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
@@ -1238,11 +1248,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					return;
 				}
 
-				closeResourceManagerConnection(new Exception(
-					"ResourceManager leader changed to new address " + resourceManagerAddress));
-
 				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
 					resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+
+				closeResourceManagerConnection(new Exception(
+					"ResourceManager leader changed to new address " + resourceManagerAddress));
 			} else {
 				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
 					resourceManagerConnection.getTargetAddress());
@@ -1277,9 +1287,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 			final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
+			final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
+
+			establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
+				resourceManagerGateway,
+				success.getResourceManagerId(),
+				resourceManagerResourceId);
+
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
-			resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
+			resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
 				@Override
 				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 					resourceManagerGateway.heartbeatFromJobManager(resourceID);
@@ -1297,22 +1314,31 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (resourceManagerConnection != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
-			} else {
-				log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage());
-			}
-
-			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
-
-			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-			resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);
+		if (establishedResourceManagerConnection != null) {
+			dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
+			establishedResourceManagerConnection = null;
+		}
 
+		if (resourceManagerConnection != null) {
+			// stop a potentially ongoing registration process
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+	}
+
+	private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
+		final ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
 
+		if (log.isDebugEnabled()) {
+			log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, cause);
+		} else {
+			log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, cause.getMessage());
+		}
+
+		resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
+
+		ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
+		resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause);
 		slotPoolGateway.disconnectResourceManager();
 	}
 
@@ -1473,8 +1499,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		private final JobMasterId jobMasterId;
 
-		private ResourceID resourceManagerResourceID;
-
 		ResourceManagerConnection(
 				final Logger log,
 				final JobID jobID,
@@ -1498,7 +1522,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					getTargetAddress(), getTargetLeaderId()) {
 				@Override
 				protected CompletableFuture<RegistrationResponse> invokeRegistration(
-						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
+						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
 					Time timeout = Time.milliseconds(timeoutMillis);
 
 					return gateway.registerJobManager(
@@ -1513,24 +1537,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(() -> {
-				resourceManagerResourceID = success.getResourceManagerResourceId();
-				establishResourceManagerConnection(success);
-			});
+			runAsync(() -> establishResourceManagerConnection(success));
 		}
 
 		@Override
 		protected void onRegistrationFailure(final Throwable failure) {
 			handleJobMasterError(failure);
 		}
-
-		public ResourceID getResourceManagerResourceID() {
-			return resourceManagerResourceID;
-		}
-
-		public JobID getJobID() {
-			return jobID;
-		}
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ee41e95b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2f61681..c0c9162 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we can close an unestablished ResourceManager connection.
+	 */
+	@Test
+	public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build());
+
+		try {
+			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+			final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+			final String firstResourceManagerAddress = "address1";
+			final String secondResourceManagerAddress = "address2";
+
+			final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway();
+			final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway();
+
+			rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway);
+			rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway);
+
+			final OneShotLatch firstJobManagerRegistration = new OneShotLatch();
+			final OneShotLatch secondJobManagerRegistration = new OneShotLatch();
+
+			firstResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger());
+
+			secondResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger());
+
+			rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID());
+
+			// wait until we have seen the first registration attempt
+			firstJobManagerRegistration.await();
+
+			// this should stop the connection attempts towards the first RM
+			rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID());
+
+			// check that we start registering at the second RM
+			secondJobManagerRegistration.await();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private File createSavepoint(long savepointId) throws IOException {
 		final File savepointFile = temporaryFolder.newFile();
 		final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());


[07/12] flink git commit: [FLINK-9194][history] Add convenience ArchivedJson constructor

Posted by tr...@apache.org.
[FLINK-9194][history] Add convenience ArchivedJson constructor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d5b00ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d5b00ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d5b00ec

Branch: refs/heads/release-1.5
Commit: 4d5b00ec5bc5b736f838610bb10dfb99ef5fa4d2
Parents: c2e3d23
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/history/ArchivedJson.java     | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d5b00ec/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index a15dd52..9200248 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Objects;
 
 /**
@@ -30,6 +36,8 @@ import java.util.Objects;
  */
 public class ArchivedJson {
 
+	private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
+
 	private final String path;
 	private final String json;
 
@@ -38,6 +46,13 @@ public class ArchivedJson {
 		this.json = Preconditions.checkNotNull(json);
 	}
 
+	public ArchivedJson(String path, ResponseBody json) throws IOException {
+		this.path = Preconditions.checkNotNull(path);
+		StringWriter sw = new StringWriter();
+		MAPPER.writeValue(sw, Preconditions.checkNotNull(json));
+		this.json = sw.toString();
+	}
+
 	public String getPath() {
 		return path;
 	}


[05/12] flink git commit: [hotfix][history] Read/Write MultipleJobsDetails instead of manual JSON

Posted by tr...@apache.org.
[hotfix][history] Read/Write MultipleJobsDetails instead of manual JSON


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a181e17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a181e17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a181e17

Branch: refs/heads/release-1.5
Commit: 5a181e1744205e90d956c9b6cd0a1cbe9a4fc8dd
Parents: ca51c41
Author: zentol <ch...@apache.org>
Authored: Tue Apr 24 10:26:32 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java     | 19 ++++++++-----------
 .../webmonitor/history/HistoryServerTest.java    |  8 +++-----
 2 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a181e17/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 413473b..ac19197 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -31,7 +33,6 @@ import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.slf4j.Logger;
@@ -42,6 +43,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -233,21 +236,15 @@ class HistoryServerArchiveFetcher {
 	 */
 	private static void updateJobOverview(File webOverviewDir, File webDir) {
 		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
-			gen.writeStartObject();
-			gen.writeArrayFieldStart("jobs");
-
 			File[] overviews = new File(webOverviewDir.getPath()).listFiles();
 			if (overviews != null) {
+				Collection<JobDetails> allJobs = new ArrayList<>(overviews.length);
 				for (File overview : overviews) {
-					JsonNode root = mapper.readTree(overview);
-					JsonNode finished = root.get("jobs");
-					JsonNode job = finished.get(0);
-					mapper.writeTree(gen, job);
+					MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class);
+					allJobs.addAll(subJobs.getJobs());
 				}
+				mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
 			}
-
-			gen.writeEndArray();
-			gen.writeEndObject();
 		} catch (IOException ioe) {
 			LOG.error("Failed to update job overview.", ioe);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5a181e17/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index de63b43..a16f6fb 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import akka.actor.ActorRef;
@@ -94,11 +94,9 @@ public class HistoryServerTest extends TestLogger {
 
 			ObjectMapper mapper = new ObjectMapper();
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
-			JsonNode overview = mapper.readTree(response);
+			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			String jobID = overview.get("jobs").get(0).get("jid").asText();
-			JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
-			Assert.assertNotNull(jobDetails.get("jid"));
+			Assert.assertEquals(1, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}


[03/12] flink git commit: [FLINK-9194][history] Rework and extend the HistoryServer test

Posted by tr...@apache.org.
[FLINK-9194][history] Rework and extend the HistoryServer test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ba41bcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ba41bcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ba41bcd

Branch: refs/heads/release-1.5
Commit: 7ba41bcda2749388c9a6e99137b1c8476eeae0d1
Parents: 0d56793
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 15:13:41 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServerTest.java   | 100 ++++++++++++-------
 1 file changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ba41bcd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a16f6fb..580d80f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.ArchiveMessages;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.TestActorRef;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -50,43 +44,67 @@ import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
 /**
  * Tests for the HistoryServer.
  */
 public class HistoryServerTest extends TestLogger {
 
-	@Rule
-	public TemporaryFolder tmpDir = new TemporaryFolder();
-
-	@Test
-	public void testFullArchiveLifecycle() throws Exception {
-		ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob();
-
-		File jmDirectory = tmpDir.newFolder("jm");
-		File hsDirectory = tmpDir.newFolder("hs");
-
-		Configuration config = new Configuration();
-		config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private MiniClusterResource cluster;
+	private File jmDirectory;
+	private File hsDirectory;
+
+	@Before
+	public void setUp() throws Exception {
+		jmDirectory = TMP.newFolder("jm");
+		hsDirectory = TMP.newFolder("hs");
+
+		Configuration clusterConfig = new Configuration();
+		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+
+		cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				clusterConfig,
+				1,
+				1
+			),
+			MiniClusterResource.MiniClusterType.NEW
+		);
+		cluster.before();
+	}
 
-		config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
-		config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+	@After
+	public void tearDown() {
+		if (cluster != null) {
+			cluster.after();
+		}
+	}
 
-		config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+	@Test
+	public void testHistoryServerIntegration() throws Exception {
+		final int numJobs = 2;
+		for (int x = 0; x < numJobs; x++) {
+			runJob();
+		}
 
-		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
-		Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
+		CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
-		ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
-		memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+		Configuration historyServerConfig = new Configuration();
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
 
-		File archive = new File(jmDirectory, graph.getJobID().toString());
-		Assert.assertTrue(archive.exists());
+		historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
-		CountDownLatch numFinishedPolls = new CountDownLatch(1);
+		// the job is archived asynchronously after env.execute() returns
+		File[] archives = jmDirectory.listFiles();
+		while (archives == null || archives.length != numJobs) {
+			Thread.sleep(50);
+			archives = jmDirectory.listFiles();
+		}
 
-		HistoryServer hs = new HistoryServer(config, numFinishedPolls);
+		HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
@@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			Assert.assertEquals(1, overview.getJobs().size());
+			Assert.assertEquals(numJobs, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}
 	}
 
+	private static void runJob() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(1, 2, 3)
+			.print();
+
+		env.execute();
+	}
+
 	public static String getFromHTTP(String url) throws Exception {
 		URL u = new URL(url);
 		HttpURLConnection connection = (HttpURLConnection) u.openConnection();


[09/12] flink git commit: [FLINK-9194][history] Adjust handlers

Posted by tr...@apache.org.
[FLINK-9194][history] Adjust handlers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d567931
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d567931
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d567931

Branch: refs/heads/release-1.5
Commit: 0d567931024b84c2503ac78821f3e872a2cc53b8
Parents: 4d5b00e
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:34:25 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../handler/job/JobAccumulatorsHandler.java     | 22 ++++-
 .../rest/handler/job/JobConfigHandler.java      | 21 ++++-
 .../rest/handler/job/JobDetailsHandler.java     | 21 ++++-
 .../rest/handler/job/JobExceptionsHandler.java  | 21 ++++-
 .../rest/handler/job/JobPlanHandler.java        | 21 ++++-
 .../handler/job/JobVertexDetailsHandler.java    | 27 +++++-
 .../job/JobVertexTaskManagersHandler.java       | 29 ++++++-
 .../rest/handler/job/JobsOverviewHandler.java   | 19 ++++-
 ...taskExecutionAttemptAccumulatorsHandler.java | 50 ++++++++++-
 .../SubtaskExecutionAttemptDetailsHandler.java  | 59 ++++++++++++-
 .../rest/handler/job/SubtasksTimesHandler.java  | 27 +++++-
 .../checkpoints/CheckpointConfigHandler.java    | 28 ++++++-
 .../CheckpointStatisticDetailsHandler.java      | 33 +++++++-
 .../CheckpointingStatisticsHandler.java         | 26 +++++-
 .../TaskCheckpointStatisticDetailsHandler.java  | 87 ++++++++++++++------
 15 files changed, 447 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 964aee3..edb529c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -28,13 +28,19 @@ import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValue
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler that returns the aggregated accumulators of a job.
  */
-public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
+public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> implements JsonArchivist {
 
 	public JobAccumulatorsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -66,7 +72,6 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 
 	@Override
 	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
-		JobAccumulatorsInfo accumulatorsInfo;
 		List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
 		final boolean includeSerializedValue;
@@ -76,6 +81,18 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 			includeSerializedValue = false;
 		}
 
+		return createJobAccumulatorsInfo(graph, includeSerializedValue);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobAccumulatorsInfo(graph, true);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobAccumulatorsInfo createJobAccumulatorsInfo(AccessExecutionGraph graph, boolean includeSerializedValue) {
 		StringifiedAccumulatorResult[] stringifiedAccs = graph.getAccumulatorResultsStringified();
 		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(stringifiedAccs.length);
 
@@ -87,6 +104,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 					acc.getValue()));
 		}
 
+		JobAccumulatorsInfo accumulatorsInfo;
 		if (includeSerializedValue) {
 			Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserTaskAccumulators = graph.getAccumulatorsSerialized();
 			accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators);

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index 7154246..3231668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -25,11 +25,18 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobConfigInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -37,7 +44,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job configuration.
  */
-public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
+public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> implements JsonArchivist {
 
 	public JobConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -60,6 +67,18 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf
 
 	@Override
 	protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+		return createJobConfigInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobConfigInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph executionGraph) {
 		final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
 		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 82f24d3..d1383eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -32,16 +32,24 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +58,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler returning the details for the specified job.
  */
-public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {
+public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist {
 
 	private final MetricFetcher<?> metricFetcher;
 
@@ -79,7 +87,18 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 	protected JobDetailsInfo handleRequest(
 			HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
 			AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createJobDetailsInfo(executionGraph, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobDetailsInfo(graph, null);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
 
+	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
 		final long now = System.currentTimeMillis();
 		final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
 		final long endTime = executionGraph.getState().isGloballyTerminalState() ?

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 63dc604..8ec1af0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -27,14 +27,21 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -43,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job exceptions.
  */
-public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> {
+public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> implements JsonArchivist {
 
 	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
@@ -68,6 +75,18 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 
 	@Override
 	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+		return createJobExceptionsInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobExceptionsInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
+
+	private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph executionGraph) {
 		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index e7a30fb..6eb2573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -23,12 +23,19 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +43,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job execution plan.
  */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> {
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> implements JsonArchivist {
 
 	public JobPlanHandler(
 		CompletableFuture<String> localRestAddress,
@@ -59,6 +66,18 @@ public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, J
 
 	@Override
 	protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createJobPlanInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobPlanInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) {
 		return new JobPlanInfo(executionGraph.getJsonPlan());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index b4693a5..034f01d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -36,12 +36,19 @@ import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +57,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the job vertex details.
  */
-public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> {
+public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist {
 	private final MetricFetcher<? extends RestfulGateway> metricFetcher;
 
 	public JobVertexDetailsHandler(
@@ -85,6 +92,24 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
 		}
 
+		return createJobVertexDetailsInfo(jobVertex, jobID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+		for (AccessExecutionJobVertex task : vertices) {
+			ResponseBody json = createJobVertexDetailsInfo(task, graph.getJobID(), null);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
+
+	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
 		List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
 		final long now = System.currentTimeMillis();
 		int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 24650a3..efb6fc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -38,13 +38,20 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,7 +62,7 @@ import java.util.concurrent.Executor;
  * A request handler that provides the details of a job vertex, including id, name, and the
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
-public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist {
 	private MetricFetcher<?> metricFetcher;
 
 	public JobVertexTaskManagersHandler(
@@ -83,6 +90,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
 		}
 
+		return createJobVertexTaskManagersInfo(jobVertex, jobID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+		for (AccessExecutionJobVertex task : vertices) {
+			ResponseBody json = createJobVertexTaskManagersInfo(task, graph.getJobID(), null);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
+
+	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
@@ -173,6 +198,6 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				statusCounts));
 		}
 
-		return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
+		return new JobVertexTaskManagersInfo(jobVertex.getJobVertexId(), jobVertex.getName(), now, taskManagersInfoList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 94bdbd2..6d2b1e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -19,25 +19,34 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Overview handler for jobs.
  */
-public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> implements JsonArchivist {
 
 	public JobsOverviewHandler(
 			CompletableFuture<String> localRestAddress,
@@ -57,4 +66,12 @@ public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, Emp
 	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
 		return gateway.requestMultipleJobDetails(timeout);
 	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index e3b1719..e335238 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,18 +21,31 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -40,7 +53,10 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtask execution attempt accumulators.
  */
-public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptAccumulatorsHandler
+	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters>
+	implements JsonArchivist {
+
 	/**
 	 * Instantiates a new Abstract job vertex handler.
 	 *
@@ -68,7 +84,39 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 	protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest(
 			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
 			AccessExecution execution) throws RestHandlerException {
+		return createAccumulatorInfo(execution);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		List<ArchivedJson> archive = new ArrayList<>(16);
+		for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+			for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+				ResponseBody curAttemptJson = createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
+				String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+					.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+					.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+				archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+				for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+					AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+					ResponseBody json = createAccumulatorInfo(attempt);
+					String path = getMessageHeaders().getTargetRestEndpointURL()
+						.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+						.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+						.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+		}
+		return archive;
+	}
 
+	private static SubtaskExecutionAttemptAccumulatorsInfo createAccumulatorInfo(AccessExecution execution) {
 		final StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
 		final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index b781ee7..bae80c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -31,12 +34,23 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -44,7 +58,9 @@ import java.util.concurrent.Executor;
 /**
  * Handler of specific sub task execution attempt.
  */
-public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptDetailsHandler
+	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
+	implements JsonArchivist {
 
 	private final MetricFetcher<?> metricFetcher;
 
@@ -79,11 +95,48 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
 			AccessExecution execution) throws RestHandlerException {
 
-		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
-
 		final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
 		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
+		return createDetailsInfo(execution, jobID, jobVertexID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		List<ArchivedJson> archive = new ArrayList<>(16);
+		for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+			for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+				ResponseBody curAttemptJson = createDetailsInfo(subtask.getCurrentExecutionAttempt(), graph.getJobID(), task.getJobVertexId(), null);
+				String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+					.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+					.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+				archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+				for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+					AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+					ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+					String path = getMessageHeaders().getTargetRestEndpointURL()
+						.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+						.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+						.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+		}
+		return archive;
+	}
+
+	private static SubtaskExecutionAttemptDetailsInfo createDetailsInfo(
+			AccessExecution execution,
+			JobID jobID,
+			JobVertexID jobVertexID,
+			@Nullable MetricFetcher<?> metricFetcher) {
+		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
+
 		ioMetrics.addIOMetrics(
 			execution,
 			metricFetcher,

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 29c0f93..5026951 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -20,19 +20,27 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtasks times info.
  */
-public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
+public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> implements JsonArchivist {
 	public SubtasksTimesHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -63,7 +71,24 @@ public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimes
 
 	@Override
 	protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) {
+		return createSubtaskTimesInfo(jobVertex);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> allVertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(allVertices.size());
+		for (AccessExecutionJobVertex task : allVertices) {
+			ResponseBody json = createSubtaskTimesInfo(task);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
 
+	private static SubtasksTimesInfo createSubtaskTimesInfo(AccessExecutionJobVertex jobVertex) {
 		final String id = jobVertex.getJobVertexId().toString();
 		final String name = jobVertex.getName();
 		final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index b88183e..3a03575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -27,14 +27,23 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -42,7 +51,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint configuration.
  */
-public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> implements JsonArchivist {
 
 	public CheckpointConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -64,6 +73,23 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
 
 	@Override
 	protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createCheckpointConfigInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody response;
+		try {
+			response = createCheckpointConfigInfo(graph);
+		} catch (RestHandlerException rhe) {
+			response = new ErrorResponseBody(rhe.getMessage());
+		}
+		String path = CheckpointConfigHeaders.getInstance().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, response));
+	}
+
+	private static CheckpointConfigInfo createCheckpointConfigInfo(AccessExecutionGraph executionGraph) throws RestHandlerException {
 		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
 
 		if (checkpointCoordinatorConfiguration == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index 2816336..8f2d713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -20,15 +20,28 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +49,7 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which returns the details for a checkpoint.
  */
-public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> {
+public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> implements JsonArchivist {
 
 	public CheckpointStatisticDetailsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -62,4 +75,22 @@ public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler
 	protected CheckpointStatistics handleCheckpointRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> ignored, AbstractCheckpointStats checkpointStats) {
 		return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
 	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+		if (stats == null) {
+			return Collections.emptyList();
+		}
+		CheckpointStatsHistory history = stats.getHistory();
+		List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size());
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			ResponseBody json = CheckpointStatistics.generateCheckpointStatistics(checkpoint, true);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId()));
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index b9db367..d4345ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -32,17 +32,25 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -51,7 +59,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint statistics.
  */
-public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> implements JsonArchivist {
 
 	public CheckpointingStatisticsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -66,7 +74,23 @@ public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandle
 
 	@Override
 	protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createCheckpointingStatistics(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json;
+		try {
+			json = createCheckpointingStatistics(graph);
+		} catch (RestHandlerException rhe) {
+			json = new ErrorResponseBody(rhe.getMessage());
+		}
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
 
+	private static CheckpointingStatistics createCheckpointingStatistics(AccessExecutionGraph executionGraph) throws RestHandlerException {
 		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
 
 		if (checkpointStatsSnapshot == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d567931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index cff3bf0..2084c50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -20,26 +20,35 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -48,7 +57,9 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which serves checkpoint statistics for subtasks.
  */
-public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> {
+public class TaskCheckpointStatisticDetailsHandler
+	extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters>
+	implements JsonArchivist {
 
 	public TaskCheckpointStatisticDetailsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -79,30 +90,54 @@ public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHan
 
 		final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId);
 
-		if (taskStatistics != null) {
-
-			final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
-				taskStatistics.getSummaryStats(),
-				checkpointStats.getTriggerTimestamp());
-
-			final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
-				taskStatistics.getSubtaskStats(),
-				checkpointStats.getTriggerTimestamp());
-
-			return new TaskCheckpointStatisticsWithSubtaskDetails(
-				checkpointStats.getCheckpointId(),
-				checkpointStats.getStatus(),
-				taskStatistics.getLatestAckTimestamp(),
-				taskStatistics.getStateSize(),
-				taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
-				taskStatistics.getAlignmentBuffered(),
-				taskStatistics.getNumberOfSubtasks(),
-				taskStatistics.getNumberOfAcknowledgedSubtasks(),
-				summary,
-				subtaskCheckpointStatistics);
-		} else {
-			throw new RestHandlerException("There is no checkpoint statistics for task " + jobVertexId + '.', HttpResponseStatus.NOT_FOUND);
+		if (taskStatistics == null) {
+			throw new NotFoundException("There is no checkpoint statistics for task " + jobVertexId + '.');
+		}
+
+		return createCheckpointDetails(checkpointStats, taskStatistics);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+		if (stats == null) {
+			return Collections.emptyList();
+		}
+		CheckpointStatsHistory history = stats.getHistory();
+		List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size());
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+				ResponseBody json = createCheckpointDetails(checkpoint, subtaskStats);
+				String path = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobVertexIdPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId()))
+					.replace(':' + JobVertexIdPathParameter.KEY, subtaskStats.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
 		}
+		return archive;
+	}
+
+	private static TaskCheckpointStatisticsWithSubtaskDetails createCheckpointDetails(AbstractCheckpointStats checkpointStats, TaskStateStats taskStatistics) {
+		final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
+			taskStatistics.getSummaryStats(),
+			checkpointStats.getTriggerTimestamp());
+
+		final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
+			taskStatistics.getSubtaskStats(),
+			checkpointStats.getTriggerTimestamp());
+
+		return new TaskCheckpointStatisticsWithSubtaskDetails(
+			checkpointStats.getCheckpointId(),
+			checkpointStats.getStatus(),
+			taskStatistics.getLatestAckTimestamp(),
+			taskStatistics.getStateSize(),
+			taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+			taskStatistics.getAlignmentBuffered(),
+			taskStatistics.getNumberOfSubtasks(),
+			taskStatistics.getNumberOfAcknowledgedSubtasks(),
+			summary,
+			subtaskCheckpointStatistics);
 	}
 
 	private static TaskCheckpointStatisticsWithSubtaskDetails.Summary createSummary(TaskStateStats.TaskStateStatsSummary taskStatisticsSummary, long triggerTimestamp) {


[02/12] flink git commit: [FLINK-9304] Timer service shutdown should not stop if interrupted

Posted by tr...@apache.org.
[FLINK-9304] Timer service shutdown should not stop if interrupted

This closes #5962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5be783b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5be783b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5be783b

Branch: refs/heads/release-1.5
Commit: f5be783bddd8b767dbd9380a8bfc89319be729ad
Parents: d5bcecd
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 7 11:55:35 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../runtime/tasks/ProcessingTimeService.java    |  11 ++
 .../streaming/runtime/tasks/StreamTask.java     |  44 +++---
 .../tasks/SystemProcessingTimeService.java      |  32 +++++
 .../tasks/TestProcessingTimeService.java        |   6 +
 .../tasks/SystemProcessingTimeServiceTest.java  | 133 ++++++++++++++-----
 5 files changed, 168 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5be783b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 2516299..4515ce2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -96,6 +96,17 @@ public abstract class ProcessingTimeService {
 	public abstract void shutdownService();
 
 	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)}
+	 * will result in a hard exception. This call cannot be interrupted and will block until the shutdown is completed
+	 * or the timeout is exceeded.
+	 *
+	 * @param timeoutMs timeout for blocking on the service shutdown in milliseconds.
+	 * @return returns true iff the shutdown was completed.
+	 */
+	public abstract boolean shutdownServiceUninterruptible(long timeoutMs);
+
+	/**
 	 * Shuts down and clean up the timer service provider hard and immediately. This does wait
 	 * for all timers to complete or until the time limit is exceeded. Any call to
 	 * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.

http://git-wip-us.apache.org/repos/asf/flink/blob/f5be783b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6790949..2cc8886 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -73,7 +73,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -348,30 +347,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// clean up everything we initialized
 			isRunning = false;
 
-			// clear the interrupted status so that we can wait for the following resource shutdowns to complete
-			Thread.interrupted();
-
 			// stop all timers and threads
-			if (timerService != null && !timerService.isTerminated()) {
-				try {
-
-					final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
-						getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
-
-					// wait for a reasonable time for all pending timer threads to finish
-					boolean timerShutdownComplete =
-						timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
-
-					if (!timerShutdownComplete) {
-						LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
-							"timers. Will continue with shutdown procedure.", timeoutMs);
-					}
-				}
-				catch (Throwable t) {
-					// catch and log the exception to not replace the original exception
-					LOG.error("Could not shut down timer service", t);
-				}
-			}
+			tryShutdownTimerService();
 
 			// stop all asynchronous checkpoint threads
 			try {
@@ -706,6 +683,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
+	private void tryShutdownTimerService() {
+
+		if (timerService != null && !timerService.isTerminated()) {
+
+			try {
+				final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+					getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+
+				if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+					LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+						"timers. Will continue with shutdown procedure.", timeoutMs);
+				}
+			} catch (Throwable t) {
+				// catch and log the exception to not replace the original exception
+				LOG.error("Could not shut down timer service", t);
+			}
+		}
+	}
+
 	private void checkpointState(
 			CheckpointMetaData checkpointMetaData,
 			CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/f5be783b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index be8b23c..4e4208f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Delayed;
@@ -41,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SystemProcessingTimeService extends ProcessingTimeService {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
+
 	private static final int STATUS_ALIVE = 0;
 	private static final int STATUS_QUIESCED = 1;
 	private static final int STATUS_SHUTDOWN = 2;
@@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		return timerService.awaitTermination(time, timeUnit);
 	}
 
+	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+
+		final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
+
+		boolean shutdownComplete = false;
+		boolean receivedInterrupt = false;
+
+		do {
+			try {
+				// wait for a reasonable time for all pending timer threads to finish
+				shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException iex) {
+				receivedInterrupt = true;
+				LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
+			}
+		} while (deadline.hasTimeLeft() && !shutdownComplete);
+
+		if (receivedInterrupt) {
+			Thread.currentThread().interrupt();
+		}
+
+		return shutdownComplete;
+	}
+
 	// safety net to destroy the thread pool
 	@Override
 	protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/f5be783b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2081f19..f4a5f37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -135,6 +135,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+		shutdownService();
+		return true;
+	}
+
+	@Override
 	public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
 		shutdownService();
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/f5be783b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 01fd778..cfcaf72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 	public void testShutdownAndWaitPending() {
 
 		final Object lock = new Object();
-		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
-		final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
 		final OneShotLatch blockUntilTriggered = new OneShotLatch();
-		final AtomicBoolean check = new AtomicBoolean(true);
-
-		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
-			(message, exception) -> {
-			},
-			lock);
-
-		timeService.scheduleAtFixedRate(
-			timestamp -> {
-
-				waitUntilTimerStarted.trigger();
-
-				try {
-					blockUntilTerminationInterrupts.await();
-					check.set(false);
-				} catch (InterruptedException ignore) {
-				}
-
-				try {
-					blockUntilTriggered.await();
-				} catch (InterruptedException ignore) {
-					check.set(false);
-				}
-			},
-			0L,
-			10L);
+		final AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
 
-		try {
-			waitUntilTimerStarted.await();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		}
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerExecutionFinished);
 
 		Assert.assertFalse(timeService.isTerminated());
 
@@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			Assert.fail("Unexpected interruption.");
 		}
 
-		Assert.assertTrue(check.get());
+		Assert.assertTrue(timerExecutionFinished.get());
+		Assert.assertTrue(timeService.isTerminated());
+	}
+
+	@Test
+	public void testShutdownServiceUninterruptible() {
+		final Object lock = new Object();
+		final OneShotLatch blockUntilTriggered = new OneShotLatch();
+		final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+		Assert.assertFalse(timeService.isTerminated());
+
+		final Thread interruptTarget = Thread.currentThread();
+		final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+		final Thread interruptCallerThread = new Thread(() -> {
+			while (runInterrupts.get()) {
+				interruptTarget.interrupt();
+				try {
+					Thread.sleep(1);
+				} catch (InterruptedException ignore) {
+				}
+			}
+		});
+
+		interruptCallerThread.start();
+
+		final long timeoutMs = 50L;
+		final long startTime = System.nanoTime();
+		Assert.assertFalse(timeService.isTerminated());
+		// check that termination did not succeed (because of blocking timer execution)
+		Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs));
+		// check that termination flag was set.
 		Assert.assertTrue(timeService.isTerminated());
+		// check that the blocked timer is still in flight.
+		Assert.assertFalse(timerFinished.get());
+		// check that we waited until timeout
+		Assert.assertTrue((System.nanoTime() - startTime) >= (1_000_000L * timeoutMs));
+
+		runInterrupts.set(false);
+
+		do {
+			try {
+				interruptCallerThread.join();
+			} catch (InterruptedException ignore) {
+			}
+		} while (interruptCallerThread.isAlive());
+
+		blockUntilTriggered.trigger();
+		Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs));
+		Assert.assertTrue(timerFinished.get());
+	}
+
+	private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(
+		final Object lock,
+		final OneShotLatch blockUntilTriggered,
+		final AtomicBoolean check) {
+
+		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+
+		Preconditions.checkState(!check.get());
+
+		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+			(message, exception) -> {
+			},
+			lock);
+
+		timeService.scheduleAtFixedRate(
+			timestamp -> {
+
+				waitUntilTimerStarted.trigger();
+
+				boolean unblocked = false;
+
+				while (!unblocked) {
+					try {
+						blockUntilTriggered.await();
+						unblocked = true;
+					} catch (InterruptedException ignore) {
+					}
+				}
+
+				check.set(true);
+			},
+			0L,
+			10L);
+
+		try {
+			waitUntilTimerStarted.await();
+		} catch (InterruptedException e) {
+			Assert.fail("Problem while starting up service.");
+		}
+
+		return timeService;
 	}
 }


[06/12] flink git commit: [FLINK-9194][history] Add archiving routine to Dispatcher

Posted by tr...@apache.org.
[FLINK-9194][history] Add archiving routine to Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2e3d239
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2e3d239
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2e3d239

Branch: refs/heads/release-1.5
Commit: c2e3d239249df09f0c8fd63df33ec8a72c8bd4f4
Parents: 5a181e1
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 43 +++++++++++++++++++-
 .../runtime/dispatcher/MiniDispatcher.java      |  5 ++-
 .../dispatcher/StandaloneDispatcher.java        |  7 +++-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  7 +++-
 .../entrypoint/JobClusterEntrypoint.java        |  5 ++-
 .../entrypoint/SessionClusterEntrypoint.java    |  7 +++-
 .../flink/runtime/history/FsJobArchivist.java   | 42 +++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 22 +++++++++-
 .../runtime/dispatcher/DispatcherTest.java      |  1 +
 .../runtime/dispatcher/MiniDispatcherTest.java  |  1 +
 11 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 58ffda3..82b9291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -124,6 +129,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	@Nullable
 	protected final String restAddress;
 
+	@Nullable
+	private final JsonArchivist jsonArchivist;
+
+	@Nullable
+	private final Path archivePath;
+
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -140,7 +151,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -165,6 +177,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
+		this.jsonArchivist = jsonArchivist;
+
+		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+		if (configuredArchivePath != null) {
+			Path tmpArchivePath = null;
+			try {
+				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+			} catch (Exception e) {
+				log.warn("Failed to validate specified archive directory in '{}'. " +
+					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
+			}
+			archivePath = tmpArchivePath;
+		} else {
+			archivePath = null;
+		}
+
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
 		this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
@@ -621,6 +649,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
+		try {
+			if (jsonArchivist != null && archivePath != null) {
+				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
+				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
+			}
+		} catch (IOException e) {
+			log.info(
+				"Could not archive completed job {}({}).",
+				archivedExecutionGraph.getJobName(),
+				archivedExecutionGraph.getJobID(),
+				e);
+		}
+
 		final JobID jobId = archivedExecutionGraph.getJobID();
 
 		removeJob(jobId, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 4361e08..38e74fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 52ac7a0..5c6a7ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f823ea7..933add8 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				metricRegistry.getMetricQueryServicePath(),
 				archivedExecutionGraphStore,
 				this,
-				webMonitorEndpoint.getRestBaseUrl());
+				webMonitorEndpoint.getRestBaseUrl(),
+				webMonitorEndpoint);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		@Nullable String metricQueryServicePath,
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress) throws Exception;
+		@Nullable String restAddress,
+		@Nullable JsonArchivist jsonArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ea7cbe2..2a5b8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
+			jsonArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fcab796..85446eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			archivedExecutionGraphStore,
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 3a6ea4f..1cfbf96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +68,9 @@ public class FsJobArchivist {
 	 * @param graph  graph to archive
 	 * @return path to where the archive was written, or null if no archive was created
 	 * @throws IOException
+	 * @deprecated only kept for legacy reasons
 	 */
+	@Deprecated
 	public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException {
 		try {
 			FileSystem fs = rootPath.getFileSystem();
@@ -100,6 +103,45 @@ public class FsJobArchivist {
 	}
 
 	/**
+	 * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by
+	 * {@link JobManagerOptions#ARCHIVE_DIR}.
+	 *
+	 * @param rootPath directory to which the archive should be written to
+	 * @param jobId  job id
+	 * @param jsonToArchive collection of json-path pairs to that should be archived
+	 * @return path to where the archive was written, or null if no archive was created
+	 * @throws IOException
+	 */
+	public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {
+		try {
+			FileSystem fs = rootPath.getFileSystem();
+			Path path = new Path(rootPath, jobId.toString());
+			OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
+
+			try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart(ARCHIVE);
+				for (ArchivedJson archive : jsonToArchive) {
+					gen.writeStartObject();
+					gen.writeStringField(PATH, archive.getPath());
+					gen.writeStringField(JSON, archive.getJson());
+					gen.writeEndObject();
+				}
+				gen.writeEndArray();
+				gen.writeEndObject();
+			} catch (Exception e) {
+				fs.delete(path, false);
+				throw e;
+			}
+			LOG.info("Job {} has been archived at {}.", jobId, path);
+			return path;
+		} catch (IOException e) {
+			LOG.error("Failed to archive job.", e);
+			throw e;
+		}
+	}
+
+	/**
 	 * Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}.
 	 *
 	 * @param file archive to extract

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 0c1644f..04d5fb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					new MemoryArchivedExecutionGraphStore(),
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
-					dispatcherRestEndpoint.getRestBaseUrl());
+					dispatcherRestEndpoint.getRestBaseUrl(),
+					dispatcherRestEndpoint);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5cb57d3..f9b2fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,8 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -126,6 +129,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -137,7 +141,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> type of the leader gateway
  */
-public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {
 
 	protected final GatewayRetriever<? extends T> leaderRetriever;
 	protected final Configuration clusterConfiguration;
@@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	private boolean hasWebUI = false;
 
+	private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);
+
 	public WebMonitorEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<? extends T> leaderRetriever,
@@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
 		handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
 
+		handlers.stream()
+			.map(tuple -> tuple.f1)
+			.filter(handler -> handler instanceof JsonArchivist)
+			.forEachOrdered(handler -> archivingHandlers.add((JsonArchivist) handler));
+
 		return handlers;
 	}
 
@@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		fatalErrorHandler.onFatalError(exception);
 	}
 
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size());
+		for (JsonArchivist archivist : archivingHandlers) {
+			Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph);
+			archivedJson.addAll(subArchive);
+		}
+		return archivedJson;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 18a8ec1..d9482e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger {
 				archivedExecutionGraphStore,
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
+				null,
 				null);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2e3d239/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 6dfb243..157fca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
+			null,
 			jobGraph,
 			executionMode);
 	}