You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/02 17:27:39 UTC

[1/2] flink git commit: [FLINK-5941] Integrate Archiver pattern into handlers

Repository: flink
Updated Branches:
  refs/heads/master 243ef69bf -> 7fe0eb477


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 939f439..0076d42 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -21,12 +21,34 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtasksTimesHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath());
+		compareSubtaskTimes(originalTask, originalAttempt, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
@@ -41,6 +63,10 @@ public class SubtasksTimesHandlerTest {
 		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
 		String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
 
+		compareSubtaskTimes(originalTask, originalAttempt, json);
+	}
+	
+	private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e570e18..9d339f5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,14 +20,20 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -38,6 +44,37 @@ import static org.mockito.Mockito.when;
 public class CheckpointConfigHandlerTest {
 
 	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, true);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		when(graph.getJobID()).thenReturn(new JobID());
+		JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+
+		Assert.assertEquals("exactly_once", rootNode.get("mode").asText());
+		Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+
+		JsonNode externalizedNode = rootNode.get("externalization");
+		Assert.assertNotNull(externalizedNode);
+		Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+
+	}
+
+	@Test
 	public void testGetPaths() {
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
@@ -50,26 +87,10 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testSimpleConfig() throws Exception {
-		long interval = 18231823L;
-		long timeout = 996979L;
-		long minPause = 119191919L;
-		int maxConcurrent = 12929329;
-		ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none();
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
 
-		JobSnapshottingSettings settings = new JobSnapshottingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			interval,
-			timeout,
-			minPause,
-			maxConcurrent,
-			externalized,
-			null,
-			true);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -78,10 +99,10 @@ public class CheckpointConfigHandlerTest {
 		JsonNode rootNode = mapper.readTree(json);
 
 		assertEquals("exactly_once", rootNode.get("mode").asText());
-		assertEquals(interval, rootNode.get("interval").asLong());
-		assertEquals(timeout, rootNode.get("timeout").asLong());
-		assertEquals(minPause, rootNode.get("min_pause").asLong());
-		assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt());
+		assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
 
 		JsonNode externalizedNode = rootNode.get("externalization");
 		assertNotNull(externalizedNode);
@@ -93,20 +114,9 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testAtLeastOnce() throws Exception {
-		JobSnapshottingSettings settings = new JobSnapshottingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			996979L,
-			1818L,
-			1212L,
-			12,
-			ExternalizedCheckpointSettings.none(),
-			null,
-			false); // at least once
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, false);
 
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+		AccessExecutionGraph graph = graphAndSettings.graph;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -122,30 +132,60 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testEnabledExternalizedCheckpointSettings() throws Exception {
-		ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	}
+
+	private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) {
+		long interval = 18231823L;
+		long timeout = 996979L;
+		long minPause = 119191919L;
+		int maxConcurrent = 12929329;
+		ExternalizedCheckpointSettings externalizedSetting = externalized
+			? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+			: ExternalizedCheckpointSettings.none();
 
 		JobSnapshottingSettings settings = new JobSnapshottingSettings(
 			Collections.<JobVertexID>emptyList(),
 			Collections.<JobVertexID>emptyList(),
 			Collections.<JobVertexID>emptyList(),
-			996979L,
-			1818L,
-			1212L,
-			12,
-			externalizedSettings,
+			interval,
+			timeout,
+			minPause,
+			maxConcurrent,
+			externalizedSetting,
 			null,
-			false); // at least once
+			exactlyOnce);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		return new GraphAndSettings(graph, settings, externalizedSetting);
+	}
 
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
-		assertNotNull(externalizedNode);
-		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
-		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	private static class GraphAndSettings {
+		public final AccessExecutionGraph graph;
+		public final JobSnapshottingSettings snapshottingSettings;
+		public final ExternalizedCheckpointSettings externalizedSettings;
+
+		public GraphAndSettings(
+				AccessExecutionGraph graph,
+				JobSnapshottingSettings snapshottingSettings,
+				ExternalizedCheckpointSettings externalizedSettings) {
+			this.graph = graph;
+			this.snapshottingSettings = snapshottingSettings;
+			this.externalizedSettings = externalizedSettings;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index ca9b606..770b032 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -32,12 +33,19 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -50,6 +58,44 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CheckpointStatsDetailsHandlerTest {
+	
+	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+		CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint();
+		FailedCheckpointStats failedCheckpoint = createFailedCheckpoint();
+		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+		checkpoints.add(failedCheckpoint);
+		checkpoints.add(completedCheckpoint);
+		
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(checkpoints);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+
+		ObjectMapper mapper = new ObjectMapper();
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(2, archives.size());
+		
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(),
+			archive1.getPath());
+		compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(),
+			archive2.getPath());
+		compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson()));
+	}
 
 	@Test
 	public void testGetPaths() {
@@ -146,8 +192,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
 
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
+		verifyTaskNodes(taskStats, rootNode);
 	}
 
 	/**
@@ -155,6 +200,32 @@ public class CheckpointStatsDetailsHandlerTest {
 	 */
 	@Test
 	public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+		CompletedCheckpointStats checkpoint = createCompletedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareCompletedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a failed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+		FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareFailedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static CompletedCheckpointStats createCompletedCheckpoint() {
 		CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
 		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
@@ -177,8 +248,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-		JsonNode rootNode = triggerRequest(checkpoint);
+		return checkpoint;
+	}
 
+	private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) {
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
 		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -191,18 +264,11 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
 	}
 
-	/**
-	 * Tests a checkpoint details request for a failed checkpoint.
-	 */
-	@Test
-	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+	private static FailedCheckpointStats createFailedCheckpoint() {
 		FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
-		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+		when(checkpoint.getCheckpointId()).thenReturn(1818214L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
 		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
 		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
@@ -223,8 +289,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-		JsonNode rootNode = triggerRequest(checkpoint);
+		return checkpoint;
+	}
 
+	private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) {
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
 		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -237,13 +305,8 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
 	}
 
-	// ------------------------------------------------------------------------
-
 	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
 		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
 		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
@@ -262,16 +325,18 @@ public class CheckpointStatsDetailsHandlerTest {
 		return mapper.readTree(json);
 	}
 
-	private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
-		long duration = ThreadLocalRandom.current().nextInt(128);
-
-		JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
-		assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
-		assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
-		assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
-		assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
-		assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
-		assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+	private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) {
+		for (TaskStateStats task : tasks) {
+			long duration = ThreadLocalRandom.current().nextInt(128);
+
+			JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+			assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+			assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+			assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+			assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+			assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+			assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+		}
 	}
 
 	private static TaskStateStats createTaskStateStats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index ab7c7a3..1e4a255 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -34,10 +35,15 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -51,6 +57,32 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsHandlerTest {
 
 	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+		when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID());
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph);
+		Assert.assertEquals(3, archives.size());
+
+		ObjectMapper mapper = new ObjectMapper();
+		
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
+		compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
+		compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson()));
+		
+		ArchivedJson archive3 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
+		compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson()));
+	}
+	
+
+	@Test
 	public void testGetPaths() {
 		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
@@ -63,6 +95,18 @@ public class CheckpointStatsHandlerTest {
 	 */
 	@Test
 	public void testCheckpointStatsRequest() throws Exception {
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		compareCheckpointStats(testCheckpointStats, rootNode);
+	}
+
+	private static TestCheckpointStats createTestCheckpointStats() {
 		// Counts
 		CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
 		when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
@@ -104,7 +148,7 @@ public class CheckpointStatsHandlerTest {
 		when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
 
 		CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
-		when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+		when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
 		when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
 		when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
 		when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
@@ -133,7 +177,7 @@ public class CheckpointStatsHandlerTest {
 		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
 
 		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
-		when(inProgress.getCheckpointId()).thenReturn(1992139L);
+		when(inProgress.getCheckpointId()).thenReturn(1992141L);
 		when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
 		when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
@@ -189,12 +233,15 @@ public class CheckpointStatsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(json);
+		return new TestCheckpointStats(
+			graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary,
+			latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress, 
+			completedSavepoint, failed, history, snapshot
+		);
+	}
 
+	private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode rootNode) {
+		CheckpointStatsCounts counts = checkpointStats.counts;
 		JsonNode countNode = rootNode.get("counts");
 		assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
 		assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
@@ -202,22 +249,26 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
 		assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
 
+		MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary;
 		JsonNode summaryNode = rootNode.get("summary");
 		JsonNode sizeSummaryNode = summaryNode.get("state_size");
 		assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
 		assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
 		assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
 
+		MinMaxAvgStats durationSummary = checkpointStats.durationSummary;
 		JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
 		assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
 		assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
 		assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
 
+		MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary;
 		JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
 		assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
 		assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
 		assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
 
+		CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted;
 		JsonNode latestNode = rootNode.get("latest");
 		JsonNode latestCheckpointNode = latestNode.get("completed");
 		assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
@@ -228,6 +279,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
 		assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
 
+		CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint;
 		JsonNode latestSavepointNode = latestNode.get("savepoint");
 		assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
 		assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
@@ -237,6 +289,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
 		assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
 
+		FailedCheckpointStats latestFailed = checkpointStats.latestFailed;
 		JsonNode latestFailedNode = latestNode.get("failed");
 		assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
 		assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
@@ -247,6 +300,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
 		assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
 
+		RestoredCheckpointStats latestRestored = checkpointStats.latestRestored;
 		JsonNode latestRestoredNode = latestNode.get("restored");
 		assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
 		assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
@@ -259,6 +313,25 @@ public class CheckpointStatsHandlerTest {
 		assertTrue(it.hasNext());
 		JsonNode inProgressNode = it.next();
 
+		PendingCheckpointStats inProgress = checkpointStats.inProgress;
+		compareInProgressCheckpoint(inProgress, inProgressNode);
+
+		assertTrue(it.hasNext());
+		JsonNode completedSavepointNode = it.next();
+
+		CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint;
+		compareCompletedSavepoint(completedSavepoint, completedSavepointNode);
+
+		assertTrue(it.hasNext());
+		JsonNode failedNode = it.next();
+
+		FailedCheckpointStats failed = checkpointStats.failed;
+		compareFailedCheckpoint(failed, failedNode);
+
+		assertFalse(it.hasNext());
+	}
+
+	private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode inProgressNode) {
 		assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
 		assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
 		assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
@@ -269,10 +342,9 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
 		assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
 		assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+	}
 
-		assertTrue(it.hasNext());
-		JsonNode completedSavepointNode = it.next();
-
+	private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint, JsonNode completedSavepointNode) {
 		assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
 		assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
 		assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
@@ -286,10 +358,9 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
 		assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+	}
 
-		assertTrue(it.hasNext());
-		JsonNode failedNode = it.next();
-
+	private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode) {
 		assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
 		assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
 		assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
@@ -303,7 +374,56 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
 		assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
-
-		assertFalse(it.hasNext());
+	}
+	
+	private static class TestCheckpointStats {
+		public final AccessExecutionGraph graph;
+		public final CheckpointStatsCounts counts;
+		public final MinMaxAvgStats stateSizeSummary;
+		public final MinMaxAvgStats durationSummary;
+		public final MinMaxAvgStats alignmentBufferedSummary;
+		public final CompletedCheckpointStatsSummary summary;
+		public final CompletedCheckpointStats latestCompleted;
+		public final CompletedCheckpointStats latestSavepoint;
+		public final FailedCheckpointStats latestFailed;
+		public final RestoredCheckpointStats latestRestored;
+		public final PendingCheckpointStats inProgress;
+		public final CompletedCheckpointStats completedSavepoint;
+		public final FailedCheckpointStats failed;
+		public final CheckpointStatsHistory history;
+		public final CheckpointStatsSnapshot snapshot;
+
+		public TestCheckpointStats(
+				AccessExecutionGraph graph,
+				CheckpointStatsCounts counts,
+				MinMaxAvgStats stateSizeSummary,
+				MinMaxAvgStats durationSummary,
+				MinMaxAvgStats alignmentBufferedSummary,
+				CompletedCheckpointStatsSummary summary,
+				CompletedCheckpointStats latestCompleted,
+				CompletedCheckpointStats latestSavepoint,
+				FailedCheckpointStats latestFailed,
+				RestoredCheckpointStats latestRestored,
+				PendingCheckpointStats inProgress,
+				CompletedCheckpointStats completedSavepoint,
+				FailedCheckpointStats failed,
+				CheckpointStatsHistory history,
+				CheckpointStatsSnapshot snapshot) {
+			this.graph = graph;
+			this.counts = counts;
+			this.stateSizeSummary = stateSizeSummary;
+			this.durationSummary = durationSummary;
+			this.alignmentBufferedSummary = alignmentBufferedSummary;
+			this.summary = summary;
+			this.latestCompleted = latestCompleted;
+			this.latestSavepoint = latestSavepoint;
+			this.latestFailed = latestFailed;
+			this.latestRestored = latestRestored;
+			this.inProgress = inProgress;
+			this.completedSavepoint = completedSavepoint;
+			this.failed = failed;
+			this.history = history;
+			this.snapshot = snapshot;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 26433fa..bbab621 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -31,9 +32,13 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -55,6 +60,42 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsSubtaskDetailsHandlerTest {
 
 	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
+		ObjectMapper mapper = new ObjectMapper();
+
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+		TaskStateStats task = createTaskStateStats(1237);
+		when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
+
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+		
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
+			archive.getPath());
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+
+		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
+	}
+
+	@Test
 	public void testGetPaths() {
 		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index 0340d87..ed339ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -105,7 +105,7 @@ public class ArchivedJobGenerationUtils {
 		originalAttempt = new ArchivedExecutionBuilder()
 			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
 			.setParallelSubtaskIndex(1)
-			.setAttemptNumber(3)
+			.setAttemptNumber(0)
 			.setAssignedResourceLocation(location)
 			.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
 			.setState(ExecutionState.FINISHED)

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
new file mode 100644
index 0000000..22e011c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A simple container for a handler's JSON response and the REST URLs for which the response would've been returned.
+ * 
+ * These are created by {@link JsonArchivist}s, and used by the {@link MemoryArchivist} to create a directory structure
+ * resembling the REST API.
+ */
+public class ArchivedJson {
+	private final String path;
+	private final String json;
+	
+	public ArchivedJson(String path, String json) {
+		this.path = Preconditions.checkNotNull(path);
+		this.json = Preconditions.checkNotNull(json);
+	}
+
+	public String getPath() {
+		return path;
+	}
+
+	public String getJson() {
+		return json;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
new file mode 100644
index 0000000..a87cc47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of job-related json responses.
+ * 
+ * Note that all JsonArchivists that are to be used for the history server must be added
+ * to {@link WebRuntimeMonitor#getArchivers()}.
+ */
+public interface JsonArchivist {
+
+	/**
+	 * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their respective REST URL
+	 * for a given job.
+	 *
+	 * The collection should contain one entry for every response that could be generated for the given
+	 * job, for example one entry for each task. The REST URLs should be unique and must not contain placeholders.
+	 *
+	 * @param graph AccessExecutionGraph for which the responses should be generated
+	 *
+	 * @return Collection containing an ArchivedJson for every response that could be generated for the given job
+	 * @throws IOException thrown if the JSON generation fails
+	 */
+	Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+}


[2/2] flink git commit: [FLINK-5941] Integrate Archiver pattern into handlers

Posted by ch...@apache.org.
[FLINK-5941] Integrate Archiver pattern into handlers

This closes #3444.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fe0eb47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fe0eb47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fe0eb47

Branch: refs/heads/master
Commit: 7fe0eb477df52cfd7254695a67d41f3cba34ef0a
Parents: 243ef69
Author: zentol <ch...@apache.org>
Authored: Mon Feb 20 16:30:35 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 2 18:27:15 2017 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  42 ++++++
 .../handlers/CurrentJobsOverviewHandler.java    |  26 ++++
 .../handlers/JobAccumulatorsHandler.java        |  15 ++
 .../webmonitor/handlers/JobConfigHandler.java   |  16 ++
 .../webmonitor/handlers/JobDetailsHandler.java  |  20 +++
 .../handlers/JobExceptionsHandler.java          |  15 ++
 .../webmonitor/handlers/JobPlanHandler.java     |  16 ++
 .../handlers/JobVertexAccumulatorsHandler.java  |  22 +++
 .../handlers/JobVertexDetailsHandler.java       |  22 +++
 .../handlers/JobVertexTaskManagersHandler.java  |  20 +++
 ...taskExecutionAttemptAccumulatorsHandler.java |  40 +++++
 .../SubtaskExecutionAttemptDetailsHandler.java  |  47 ++++++
 .../SubtasksAllAccumulatorsHandler.java         |  22 +++
 .../handlers/SubtasksTimesHandler.java          |  22 +++
 .../checkpoints/CheckpointConfigHandler.java    |  15 ++
 .../CheckpointStatsDetailsHandler.java          |  28 ++++
 .../CheckpointStatsDetailsSubtasksHandler.java  |  31 ++++
 .../checkpoints/CheckpointStatsHandler.java     |  15 ++
 .../CurrentJobsOverviewHandlerTest.java         |  32 +++-
 .../handlers/JobAccumulatorsHandlerTest.java    |  23 +++
 .../handlers/JobConfigHandlerTest.java          |  21 +++
 .../handlers/JobDetailsHandlerTest.java         |  28 ++++
 .../handlers/JobExceptionsHandlerTest.java      |  23 +++
 .../webmonitor/handlers/JobPlanHandlerTest.java |  20 +++
 .../JobVertexAccumulatorsHandlerTest.java       |  25 ++++
 .../handlers/JobVertexDetailsHandlerTest.java   |  25 ++++
 .../JobVertexTaskManagersHandlerTest.java       |  26 ++++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  33 ++++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  40 +++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  25 ++++
 .../handlers/SubtasksTimesHandlerTest.java      |  26 ++++
 .../CheckpointConfigHandlerTest.java            | 140 ++++++++++-------
 .../CheckpointStatsDetailsHandlerTest.java      | 121 +++++++++++----
 .../checkpoints/CheckpointStatsHandlerTest.java | 150 +++++++++++++++++--
 ...heckpointStatsSubtaskDetailsHandlerTest.java |  41 +++++
 .../utils/ArchivedJobGenerationUtils.java       |   2 +-
 .../webmonitor/history/ArchivedJson.java        |  45 ++++++
 .../webmonitor/history/JsonArchivist.java       |  46 ++++++
 38 files changed, 1231 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index dddc69d..e604ce8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -77,6 +78,7 @@ import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsC
 import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
@@ -424,6 +426,46 @@ public class WebRuntimeMonitor implements WebMonitor {
 		LOG.info("Web frontend listening at " + address + ':' + port);
 	}
 
+	/**
+	 * Returns an array of all {@link JsonArchivist}s that are relevant for the history server.
+	 * 
+	 * This method is static to allow easier access from the {@link MemoryArchivist}. Requiring a reference
+	 * would imply that the WebRuntimeMonitor is always created before the archivist, which may not hold for all
+	 * deployment modes.
+	 * 
+	 * Similarly, no handler implements the JsonArchivist interface itself but instead contains a separate implementing
+	 * class; otherwise we would either instantiate several handlers even though their main functionality isn't
+	 * required, or yet again require that the WebRuntimeMonitor is started before the archivist.
+	 * 
+	 * @return array of all JsonArchivists relevant for the history server
+	 */
+	public static JsonArchivist[] getArchivers() {
+		JsonArchivist[] archivists = new JsonArchivist[]{
+			new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
+
+			new JobPlanHandler.JobPlanJsonArchivist(),
+			new JobConfigHandler.JobConfigJsonArchivist(),
+			new JobExceptionsHandler.JobExceptionsJsonArchivist(),
+			new JobDetailsHandler.JobDetailsJsonArchivist(),
+			new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),
+
+			new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),
+			new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),
+			new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),
+			new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),
+				
+			new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),
+			new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),
+			new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),
+			new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),
+			new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),
+			
+			new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),
+			new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist()
+			};
+		return archivists;
+	}
+
 	@Override
 	public void start(String jobManagerAkkaUrl) throws Exception {
 		LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 00cf138..60a2b27 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -20,16 +20,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -121,6 +127,26 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 		}
 	}
 
+	public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			StringWriter writer = new StringWriter();
+			try (JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart("running");
+				gen.writeEndArray();
+				gen.writeArrayFieldStart("finished");
+				writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
+				gen.writeEndArray();
+				gen.writeEndObject();
+			}
+			String json = writer.toString();
+			String path = ALL_JOBS_REST_PATH;
+			return Collections.singleton(new ArchivedJson(path, json));
+		}
+	}
+
 	public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
 		gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index dfc654e..c403aa2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -22,9 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -48,6 +52,17 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 		return createJobAccumulatorsJson(graph);
 	}
 
+	public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobAccumulatorsJson(graph);
+			String path = JOB_ACCUMULATORS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
 	public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 7d72235..2b96456 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -26,6 +26,11 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -48,6 +53,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 		return createJobConfigJson(graph);
 	}
 
+	public static class JobConfigJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobConfigJson(graph);
+			String path = JOB_CONFIG_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
 	public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6d1f82f..029a4b5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,12 +27,16 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -67,6 +71,22 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 		return createJobDetailsJson(graph, fetcher);
 	}
 
+	public static class JobDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobDetailsJson(graph, null);
+			String path1 = JOB_DETAILS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			String path2 = JOB_DETAILS_VERTICES_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			Collection<ArchivedJson> archives = new ArrayList();
+			archives.add(new ArchivedJson(path1, json));
+			archives.add(new ArchivedJson(path2, json));
+			return archives;
+		}
+	}
+
 	public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
 		final StringWriter writer = new StringWriter();
 		final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 0cce61f..81cdc83 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -23,10 +23,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -52,6 +56,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		return createJobExceptionsJson(graph);
 	}
 
+	public static class JobExceptionsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobExceptionsJson(graph);
+			String path = JOB_EXCEPTIONS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
 	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index becc2e1..885d04e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -43,4 +48,15 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		return graph.getJsonPlan();
 	}
+
+	public static class JobPlanJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String path = JOB_PLAN_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			String json = graph.getJsonPlan();
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ca0488b..2532a1e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,11 +21,17 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 
@@ -47,6 +53,22 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 		return createVertexAccumulatorsJson(jobVertex);
 	}
 
+	public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				String json = createVertexAccumulatorsJson(task);
+				String path = JOB_VERTEX_ACCUMULATORS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":vertexid", task.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 6e7e47c..d9a1131 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -21,16 +21,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -58,6 +64,22 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 		return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
 	}
 
+	public static class JobVertexDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				String json = createVertexDetailsJson(task, graph.getJobID().toString(), null);
+				String path = JOB_VERTEX_DETAILS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":vertexid", task.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createVertexDetailsJson(
 			AccessExecutionJobVertex jobVertex,
 			String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 4fa54bd..3878722 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
@@ -32,6 +35,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +65,22 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 		return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
 	}
 
+	public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				String json = createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null);
+				String path = JOB_VERTEX_TASKMANAGERS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":vertexid", task.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createVertexDetailsByTaskManagerJson(
 			AccessExecutionJobVertex jobVertex,
 			String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index a63016c..9026a22 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,10 +21,18 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -49,6 +57,38 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 		return createAttemptAccumulatorsJson(execAttempt);
 	}
 		
+	public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+					String curAttemptJson = createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt());
+					String curAttemptPath = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+						.replace(":jobid", graph.getJobID().toString())
+						.replace(":vertexid", task.getJobVertexId().toString())
+						.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+					archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+					for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+						AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+						String json = createAttemptAccumulatorsJson(attempt);
+						String path = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+							.replace(":jobid", graph.getJobID().toString())
+							.replace(":vertexid", task.getJobVertexId().toString())
+							.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+							.replace(":attempt", String.valueOf(attempt.getAttemptNumber()));
+						archive.add(new ArchivedJson(path, json));
+					}
+				}
+			}
+			return archive;
+		}
+	}
+
 	public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 5af6af9..078f54a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -22,16 +22,26 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
+
 /**
  * Request handler providing details about a single task execution attempt.
  */
@@ -56,6 +66,43 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
 	}
 
+	public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+					String curAttemptJson = createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+					String curAttemptPath1 = SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH
+						.replace(":jobid", graph.getJobID().toString())
+						.replace(":vertexid", task.getJobVertexId().toString())
+						.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()));
+					String curAttemptPath2 = SUBTASK_ATTEMPT_DETAILS_REST_PATH
+						.replace(":jobid", graph.getJobID().toString())
+						.replace(":vertexid", task.getJobVertexId().toString())
+						.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+					
+					archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson));
+					archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson));
+
+					for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+						AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+						String json = createAttemptDetailsJson(attempt, graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+						String path = SUBTASK_ATTEMPT_DETAILS_REST_PATH
+							.replace(":jobid", graph.getJobID().toString())
+							.replace(":vertexid", task.getJobVertexId().toString())
+							.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
+							.replace(":attempt", String.valueOf(attempt.getAttemptNumber()));
+						archive.add(new ArchivedJson(path, json));
+					}
+				}
+			}
+			return archive;
+		}
+	}
+
 	public static String createAttemptDetailsJson(
 			AccessExecution execAttempt,
 			String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 10a8773..6c3bc18 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -51,6 +57,22 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 		return createSubtasksAccumulatorsJson(jobVertex);
 	}
 
+	public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				String json = createSubtasksAccumulatorsJson(task);
+				String path = SUBTASKS_ALL_ACCUMULATORS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":vertexid", task.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 08bd722..adefa80 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -52,6 +58,22 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 		return createSubtaskTimesJson(jobVertex);
 	}
 
+	public static class SubtasksTimesJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+				String json = createSubtaskTimesJson(task);
+				String path = SUBTASK_TIMES_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":vertexid", task.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 9976298..947b7c3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -25,9 +25,13 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -51,6 +55,17 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 		return createCheckpointConfigJson(graph);
 	}
 
+	public static class CheckpointConfigJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createCheckpointConfigJson(graph);
+			String path = CHECKPOINT_CONFIG_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
 	private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 4bbb8f6..16fd9bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -28,9 +29,15 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -79,6 +86,27 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 		return createCheckpointDetailsJson(checkpoint);
 	}
 
+	public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+			if (stats == null) {
+				return Collections.emptyList();
+			}
+			CheckpointStatsHistory history = stats.getHistory();
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+				String json = createCheckpointDetailsJson(checkpoint);
+				String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()));
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
 	public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index b28ecef..bb39b2c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -31,9 +32,15 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
@@ -104,6 +111,30 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 		return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
 	}
 
+	public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+			if (stats == null) {
+				return Collections.emptyList();
+			}
+			CheckpointStatsHistory history = stats.getHistory();
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+				for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+					String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+					String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+						.replace(":jobid", graph.getJobID().toString())
+						.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()))
+						.replace(":vertexid", subtaskStats.getJobVertexId().toString());
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+			return archive;
+		}
+	}
+
 	private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 585ab26..f004888 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -32,10 +32,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -59,6 +63,17 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		return createCheckpointStatsJson(graph);
 	}
 
+	public static class CheckpointStatsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createCheckpointStatsJson(graph);
+			String path = CHECKPOINT_STATS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
 	private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index caf6d8e..097961e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,19 +19,47 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class CurrentJobsOverviewHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/joboverview", archive.getPath());
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(archive.getJson());
+		ArrayNode running = (ArrayNode) result.get("running");
+		Assert.assertEquals(0, running.size());
+
+		ArrayNode finished = (ArrayNode) result.get("finished");
+		Assert.assertEquals(1, finished.size());
+
+		compareJobOverview(expectedDetails, finished.get(0).toString());
+	}
+
 	@Test
 	public void testGetPaths() {
 		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
@@ -58,8 +86,10 @@ public class CurrentJobsOverviewHandlerTest {
 		try (JsonGenerator gen = ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) {
 			CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
 		}
-		String answer = writer.toString();
+		compareJobOverview(expectedDetails, writer.toString());
+	}
 
+	private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(answer);
 
 		Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 34748b7..f8ea792 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -20,11 +20,30 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath());
+		compareAccumulators(originalJob, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null);
@@ -38,6 +57,10 @@ public class JobAccumulatorsHandlerTest {
 		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
 		String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
 
+		compareAccumulators(originalJob, json);
+	}
+
+	private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		ArrayNode accs = (ArrayNode) result.get("job-accumulators");

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index f304efe..f47b8ca 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,13 +20,31 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 public class JobConfigHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath());
+		compareJobConfig(originalJob, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobConfigHandler handler = new JobConfigHandler(null);
@@ -38,7 +56,10 @@ public class JobConfigHandlerTest {
 	public void testJsonGeneration() throws Exception {
 		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
 		String answer = JobConfigHandler.createJobConfigJson(originalJob);
+		compareJobConfig(originalJob, answer);
+	}
 
+	private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException {
 		JsonNode job = ArchivedJobGenerationUtils.mapper.readTree(answer);
 
 		Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index 3f80d12..0c4fb7e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -26,13 +26,37 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 public class JobDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(2, archives.size());
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath());
+		compareJobDetails(originalJob, archive1.getJson());
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath());
+		compareJobDetails(originalJob, archive2.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobDetailsHandler handler = new JobDetailsHandler(null, null);
@@ -48,6 +72,10 @@ public class JobDetailsHandlerTest {
 		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
 		String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
 
+		compareJobDetails(originalJob, json);
+	}
+
+	private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index c86ce6a..c51053a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,12 +22,31 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobExceptionsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath());
+		compareExceptions(originalJob, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobExceptionsHandler handler = new JobExceptionsHandler(null);
@@ -41,6 +60,10 @@ public class JobExceptionsHandlerTest {
 		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
 		String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
 
+		compareExceptions(originalJob, json);
+	}
+
+	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 42808ed..2ef5bb9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -17,10 +17,30 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
+
 public class JobPlanHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath());
+		Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobPlanHandler handler = new JobPlanHandler(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index 03c1896..8c88da8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -20,12 +20,33 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath());
+		compareAccumulators(originalTask, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null);
@@ -39,6 +60,10 @@ public class JobVertexAccumulatorsHandlerTest {
 		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
 		String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
 
+		compareAccumulators(originalTask, json);
+	}
+
+	private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index e909c8c..0fae8b5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -20,14 +20,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+		compareVertexDetails(originalTask, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null);
@@ -42,6 +63,10 @@ public class JobVertexDetailsHandlerTest {
 		String json = JobVertexDetailsHandler.createVertexDetailsJson(
 			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
 
+		compareVertexDetails(originalTask, json);
+	}
+
+	private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 11e35e5..9271712 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -20,15 +20,37 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexTaskManagersHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath());
+		compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null);
@@ -44,6 +66,10 @@ public class JobVertexTaskManagersHandlerTest {
 		String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
 			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
 
+		compareVertexTaskManagers(originalTask, originalSubtask, json);
+	}
+
+	private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 8d24bd0..5993d5c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -20,11 +20,40 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+			"/vertices/" + originalTask.getJobVertexId() +
+			"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+			"/attempts/" + originalAttempt.getAttemptNumber() + 
+			"/accumulators",
+			archive.getPath());
+		compareAttemptAccumulators(originalAttempt, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null);
@@ -38,6 +67,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
 		String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
 
+		compareAttemptAccumulators(originalAttempt, json);
+	}
+
+	private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index 54f3f9c..f18858e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,11 +22,47 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
 public class SubtaskExecutionAttemptDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(2, archives.size());
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+				"/vertices/" + originalTask.getJobVertexId() +
+				"/subtasks/" + originalAttempt.getParallelSubtaskIndex(),
+			archive1.getPath());
+		compareAttemptDetails(originalAttempt, archive1.getJson());
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+				"/vertices/" + originalTask.getJobVertexId() +
+				"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+				"/attempts/" + originalAttempt.getAttemptNumber(),
+			archive2.getPath());
+		compareAttemptDetails(originalAttempt, archive2.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null);
@@ -43,6 +79,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 		String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
 			originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null);
 
+		compareAttemptDetails(originalAttempt, json);
+	}
+	
+	private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 954ebad..dfbe618 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -19,13 +19,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtasksAllAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + 
+			"/subtasks/accumulators", archive.getPath());
+		compareSubtaskAccumulators(originalTask, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null);
@@ -38,7 +60,10 @@ public class SubtasksAllAccumulatorsHandlerTest {
 	public void testJsonGeneration() throws Exception {
 		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
 		String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+		compareSubtaskAccumulators(originalTask, json);
+	}
 
+	private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());