You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:12 UTC

[02/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
new file mode 100644
index 0000000..4be7840
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointStatsHandler.
+ */
+public class CheckpointStatsHandlerTest {
+
+	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+		when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID());
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph);
+		Assert.assertEquals(3, archives.size());
+
+		ObjectMapper mapper = new ObjectMapper();
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
+		compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
+		compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson()));
+
+		ArchivedJson archive3 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
+		compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson()));
+	}
+
+	@Test
+	public void testGetPaths() {
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]);
+	}
+
+	/**
+	 * Tests a complete checkpoint stats snapshot.
+	 */
+	@Test
+	public void testCheckpointStatsRequest() throws Exception {
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		compareCheckpointStats(testCheckpointStats, rootNode);
+	}
+
+	private static TestCheckpointStats createTestCheckpointStats() {
+		// Counts
+		CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
+		when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
+		when(counts.getTotalNumberOfCheckpoints()).thenReturn(12981231203L);
+		when(counts.getNumberOfInProgressCheckpoints()).thenReturn(191919);
+		when(counts.getNumberOfCompletedCheckpoints()).thenReturn(882828200L);
+		when(counts.getNumberOfFailedCheckpoints()).thenReturn(99171510L);
+
+		// Summary
+		CompletedCheckpointStatsSummary summary = mock(CompletedCheckpointStatsSummary.class);
+
+		MinMaxAvgStats stateSizeSummary = mock(MinMaxAvgStats.class);
+		when(stateSizeSummary.getMinimum()).thenReturn(81238123L);
+		when(stateSizeSummary.getMaximum()).thenReturn(19919191999L);
+		when(stateSizeSummary.getAverage()).thenReturn(1133L);
+
+		MinMaxAvgStats durationSummary = mock(MinMaxAvgStats.class);
+		when(durationSummary.getMinimum()).thenReturn(1182L);
+		when(durationSummary.getMaximum()).thenReturn(88654L);
+		when(durationSummary.getAverage()).thenReturn(171L);
+
+		MinMaxAvgStats alignmentBufferedSummary = mock(MinMaxAvgStats.class);
+		when(alignmentBufferedSummary.getMinimum()).thenReturn(81818181899L);
+		when(alignmentBufferedSummary.getMaximum()).thenReturn(89999911118654L);
+		when(alignmentBufferedSummary.getAverage()).thenReturn(11203131L);
+
+		when(summary.getStateSizeStats()).thenReturn(stateSizeSummary);
+		when(summary.getEndToEndDurationStats()).thenReturn(durationSummary);
+		when(summary.getAlignmentBufferedStats()).thenReturn(alignmentBufferedSummary);
+
+		// Latest
+		CompletedCheckpointStats latestCompleted = mock(CompletedCheckpointStats.class);
+		when(latestCompleted.getCheckpointId()).thenReturn(1992139L);
+		when(latestCompleted.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(latestCompleted.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(latestCompleted.getStateSize()).thenReturn(111939272822L);
+		when(latestCompleted.getEndToEndDuration()).thenReturn(121191L);
+		when(latestCompleted.getAlignmentBuffered()).thenReturn(1L);
+		when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
+
+		CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
+		when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
+		when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
+		when(latestSavepoint.getEndToEndDuration()).thenReturn(121191L);
+		when(latestCompleted.getAlignmentBuffered()).thenReturn(182813L);
+		when(latestSavepoint.getExternalPath()).thenReturn("savepoint-external-path");
+
+		FailedCheckpointStats latestFailed = mock(FailedCheckpointStats.class);
+		when(latestFailed.getCheckpointId()).thenReturn(1112L);
+		when(latestFailed.getTriggerTimestamp()).thenReturn(12828L);
+		when(latestFailed.getLatestAckTimestamp()).thenReturn(1901L);
+		when(latestFailed.getFailureTimestamp()).thenReturn(11999976L);
+		when(latestFailed.getStateSize()).thenReturn(111L);
+		when(latestFailed.getEndToEndDuration()).thenReturn(12L);
+		when(latestFailed.getAlignmentBuffered()).thenReturn(2L);
+		when(latestFailed.getFailureMessage()).thenReturn("expected cause");
+
+		RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class);
+		when(latestRestored.getCheckpointId()).thenReturn(1199L);
+		when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
+		when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(latestRestored.getExternalPath()).thenReturn("restored savepoint path");
+
+		// History
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+
+		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
+		when(inProgress.getCheckpointId()).thenReturn(1992141L);
+		when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(inProgress.getStateSize()).thenReturn(111939272822L);
+		when(inProgress.getEndToEndDuration()).thenReturn(121191L);
+		when(inProgress.getAlignmentBuffered()).thenReturn(1L);
+		when(inProgress.getNumberOfSubtasks()).thenReturn(501);
+		when(inProgress.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+		CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class);
+		when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
+		when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+		when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
+		when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
+		when(completedSavepoint.getStateSize()).thenReturn(1119822L);
+		when(completedSavepoint.getEndToEndDuration()).thenReturn(12191L);
+		when(completedSavepoint.getAlignmentBuffered()).thenReturn(111L);
+		when(completedSavepoint.getNumberOfSubtasks()).thenReturn(33501);
+		when(completedSavepoint.getNumberOfAcknowledgedSubtasks()).thenReturn(211);
+		when(completedSavepoint.isDiscarded()).thenReturn(true);
+		when(completedSavepoint.getExternalPath()).thenReturn("completed-external-path");
+
+		FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
+		when(failed.getCheckpointId()).thenReturn(110719L);
+		when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+		when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(failed.getTriggerTimestamp()).thenReturn(191900L);
+		when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
+		when(failed.getStateSize()).thenReturn(1119822L);
+		when(failed.getEndToEndDuration()).thenReturn(12191L);
+		when(failed.getAlignmentBuffered()).thenReturn(111L);
+		when(failed.getNumberOfSubtasks()).thenReturn(33501);
+		when(failed.getNumberOfAcknowledgedSubtasks()).thenReturn(1);
+		when(failed.getFailureTimestamp()).thenReturn(119230L);
+		when(failed.getFailureMessage()).thenReturn("failure message");
+
+		checkpoints.add(inProgress);
+		checkpoints.add(completedSavepoint);
+		checkpoints.add(failed);
+		when(history.getCheckpoints()).thenReturn(checkpoints);
+		when(history.getLatestCompletedCheckpoint()).thenReturn(latestCompleted);
+		when(history.getLatestSavepoint()).thenReturn(latestSavepoint);
+		when(history.getLatestFailedCheckpoint()).thenReturn(latestFailed);
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getCounts()).thenReturn(counts);
+		when(snapshot.getSummaryStats()).thenReturn(summary);
+		when(snapshot.getHistory()).thenReturn(history);
+		when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		return new TestCheckpointStats(
+			graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary,
+			latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress,
+			completedSavepoint, failed, history, snapshot
+		);
+	}
+
+	private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode rootNode) {
+		CheckpointStatsCounts counts = checkpointStats.counts;
+		JsonNode countNode = rootNode.get("counts");
+		assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
+		assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
+		assertEquals(counts.getNumberOfInProgressCheckpoints(), countNode.get("in_progress").asLong());
+		assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
+		assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
+
+		MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary;
+		JsonNode summaryNode = rootNode.get("summary");
+		JsonNode sizeSummaryNode = summaryNode.get("state_size");
+		assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
+		assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
+		assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
+
+		MinMaxAvgStats durationSummary = checkpointStats.durationSummary;
+		JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
+		assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
+		assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
+		assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
+
+		MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary;
+		JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
+		assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
+		assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
+		assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
+
+		CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted;
+		JsonNode latestNode = rootNode.get("latest");
+		JsonNode latestCheckpointNode = latestNode.get("completed");
+		assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
+		assertEquals(latestCompleted.getTriggerTimestamp(), latestCheckpointNode.get("trigger_timestamp").asLong());
+		assertEquals(latestCompleted.getLatestAckTimestamp(), latestCheckpointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestCompleted.getStateSize(), latestCheckpointNode.get("state_size").asLong());
+		assertEquals(latestCompleted.getEndToEndDuration(), latestCheckpointNode.get("end_to_end_duration").asLong());
+		assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
+		assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
+
+		CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint;
+		JsonNode latestSavepointNode = latestNode.get("savepoint");
+		assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
+		assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
+		assertEquals(latestSavepoint.getLatestAckTimestamp(), latestSavepointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestSavepoint.getStateSize(), latestSavepointNode.get("state_size").asLong());
+		assertEquals(latestSavepoint.getEndToEndDuration(), latestSavepointNode.get("end_to_end_duration").asLong());
+		assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
+		assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
+
+		FailedCheckpointStats latestFailed = checkpointStats.latestFailed;
+		JsonNode latestFailedNode = latestNode.get("failed");
+		assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
+		assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
+		assertEquals(latestFailed.getLatestAckTimestamp(), latestFailedNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestFailed.getStateSize(), latestFailedNode.get("state_size").asLong());
+		assertEquals(latestFailed.getEndToEndDuration(), latestFailedNode.get("end_to_end_duration").asLong());
+		assertEquals(latestFailed.getAlignmentBuffered(), latestFailedNode.get("alignment_buffered").asLong());
+		assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
+		assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
+
+		RestoredCheckpointStats latestRestored = checkpointStats.latestRestored;
+		JsonNode latestRestoredNode = latestNode.get("restored");
+		assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
+		assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
+		assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean());
+		assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
+
+		JsonNode historyNode = rootNode.get("history");
+		Iterator<JsonNode> it = historyNode.iterator();
+
+		assertTrue(it.hasNext());
+		JsonNode inProgressNode = it.next();
+
+		PendingCheckpointStats inProgress = checkpointStats.inProgress;
+		compareInProgressCheckpoint(inProgress, inProgressNode);
+
+		assertTrue(it.hasNext());
+		JsonNode completedSavepointNode = it.next();
+
+		CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint;
+		compareCompletedSavepoint(completedSavepoint, completedSavepointNode);
+
+		assertTrue(it.hasNext());
+		JsonNode failedNode = it.next();
+
+		FailedCheckpointStats failed = checkpointStats.failed;
+		compareFailedCheckpoint(failed, failedNode);
+
+		assertFalse(it.hasNext());
+	}
+
+	private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode inProgressNode) {
+		assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
+		assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
+		assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
+		assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
+		assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
+		assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
+		assertEquals(inProgress.getEndToEndDuration(), inProgressNode.get("end_to_end_duration").asLong());
+		assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
+		assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
+		assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+	}
+
+	private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint, JsonNode completedSavepointNode) {
+		assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
+		assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
+		assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
+		assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
+		assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
+		assertEquals(completedSavepoint.getEndToEndDuration(), completedSavepointNode.get("end_to_end_duration").asLong());
+		assertEquals(completedSavepoint.getAlignmentBuffered(), completedSavepointNode.get("alignment_buffered").asLong());
+		assertEquals(completedSavepoint.getNumberOfSubtasks(), completedSavepointNode.get("num_subtasks").asInt());
+		assertEquals(completedSavepoint.getNumberOfAcknowledgedSubtasks(), completedSavepointNode.get("num_acknowledged_subtasks").asInt());
+
+		assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
+		assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+	}
+
+	private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode) {
+		assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
+		assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
+		assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
+		assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
+		assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
+		assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());
+		assertEquals(failed.getEndToEndDuration(), failedNode.get("end_to_end_duration").asLong());
+		assertEquals(failed.getAlignmentBuffered(), failedNode.get("alignment_buffered").asLong());
+		assertEquals(failed.getNumberOfSubtasks(), failedNode.get("num_subtasks").asInt());
+		assertEquals(failed.getNumberOfAcknowledgedSubtasks(), failedNode.get("num_acknowledged_subtasks").asInt());
+
+		assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
+		assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
+	}
+
+	private static class TestCheckpointStats {
+		public final AccessExecutionGraph graph;
+		public final CheckpointStatsCounts counts;
+		public final MinMaxAvgStats stateSizeSummary;
+		public final MinMaxAvgStats durationSummary;
+		public final MinMaxAvgStats alignmentBufferedSummary;
+		public final CompletedCheckpointStatsSummary summary;
+		public final CompletedCheckpointStats latestCompleted;
+		public final CompletedCheckpointStats latestSavepoint;
+		public final FailedCheckpointStats latestFailed;
+		public final RestoredCheckpointStats latestRestored;
+		public final PendingCheckpointStats inProgress;
+		public final CompletedCheckpointStats completedSavepoint;
+		public final FailedCheckpointStats failed;
+		public final CheckpointStatsHistory history;
+		public final CheckpointStatsSnapshot snapshot;
+
+		public TestCheckpointStats(
+				AccessExecutionGraph graph,
+				CheckpointStatsCounts counts,
+				MinMaxAvgStats stateSizeSummary,
+				MinMaxAvgStats durationSummary,
+				MinMaxAvgStats alignmentBufferedSummary,
+				CompletedCheckpointStatsSummary summary,
+				CompletedCheckpointStats latestCompleted,
+				CompletedCheckpointStats latestSavepoint,
+				FailedCheckpointStats latestFailed,
+				RestoredCheckpointStats latestRestored,
+				PendingCheckpointStats inProgress,
+				CompletedCheckpointStats completedSavepoint,
+				FailedCheckpointStats failed,
+				CheckpointStatsHistory history,
+				CheckpointStatsSnapshot snapshot) {
+			this.graph = graph;
+			this.counts = counts;
+			this.stateSizeSummary = stateSizeSummary;
+			this.durationSummary = durationSummary;
+			this.alignmentBufferedSummary = alignmentBufferedSummary;
+			this.summary = summary;
+			this.latestCompleted = latestCompleted;
+			this.latestSavepoint = latestSavepoint;
+			this.latestFailed = latestFailed;
+			this.latestRestored = latestRestored;
+			this.inProgress = inProgress;
+			this.completedSavepoint = completedSavepoint;
+			this.failed = failed;
+			this.history = history;
+			this.snapshot = snapshot;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
new file mode 100644
index 0000000..4d9b394
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointStatsSubtaskDetailsHandler.
+ */
+public class CheckpointStatsSubtaskDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
+		ObjectMapper mapper = new ObjectMapper();
+
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+		TaskStateStats task = createTaskStateStats(1237);
+		when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
+
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
+			archive.getPath());
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+
+		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
+	}
+
+	@Test
+	public void testGetPaths() {
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
+	}
+
+	/**
+	 * Tests a subtask details request.
+	 */
+	@Test
+	public void testSubtaskRequest() throws Exception {
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+		TaskStateStats task = createTaskStateStats(1237);
+		when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+
+		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
+	}
+
+	/**
+	 * Tests a subtask details request.
+	 */
+	@Test
+	public void testSubtaskRequestNoSummary() throws Exception {
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+		TaskStateStats task = createTaskStateStats(0); // no acknowledged
+		when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+		assertNull(rootNode.get("summary"));
+	}
+
+	/**
+	 * Tests request with illegal checkpoint ID param.
+	 */
+	@Test
+	public void testIllegalCheckpointId() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "illegal checkpoint");
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Tests request with missing checkpoint ID param.
+	 */
+	@Test
+	public void testNoCheckpointIdParam() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Test lookup of not existing checkpoint in history.
+	 */
+	@Test
+	public void testCheckpointNotFound() throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		params.put("vertexid", new JobVertexID().toString());
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+		verify(history, times(1)).getCheckpointById(anyLong());
+	}
+
+	/**
+	 * Tests request with illegal job vertex ID param.
+	 */
+	@Test
+	public void testIllegalJobVertexIdParam() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "1");
+		params.put("vertexid", "illegal vertex id");
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Tests request with missing job vertex ID param.
+	 */
+	@Test
+	public void testNoJobVertexIdParam() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "1");
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Test lookup of not existing job vertex ID in checkpoint.
+	 */
+	@Test
+	public void testJobVertexNotFound() throws Exception {
+		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
+		when(inProgress.getTaskStateStats(any(JobVertexID.class))).thenReturn(null); // not found
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(inProgress);
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		params.put("vertexid", new JobVertexID().toString());
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+		verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		params.put("vertexid", new JobVertexID().toString());
+		String json = handler.handleRequest(graph, params).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.readTree(json);
+	}
+
+	private static TaskStateStats createTaskStateStats(int numAcknowledged) {
+		ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+		TaskStateStats task = mock(TaskStateStats.class);
+		when(task.getJobVertexId()).thenReturn(new JobVertexID());
+		when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+		when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(numAcknowledged);
+
+		TaskStateStats.TaskStateStatsSummary summary = mock(TaskStateStats.TaskStateStatsSummary.class);
+
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getStateSizeStats();
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getAckTimestampStats();
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentBufferedStats();
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentDurationStats();
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getSyncCheckpointDurationStats();
+		doReturn(createMinMaxAvgStats(rand)).when(summary).getAsyncCheckpointDurationStats();
+
+		when(task.getSummaryStats()).thenReturn(summary);
+
+		SubtaskStateStats[] subtasks = new SubtaskStateStats[3];
+		subtasks[0] = createSubtaskStats(0, rand);
+		subtasks[1] = createSubtaskStats(1, rand);
+		subtasks[2] = null;
+
+		when(task.getSubtaskStats()).thenReturn(subtasks);
+
+		return task;
+	}
+
+	private static void verifyTaskNode(JsonNode taskNode, TaskStateStats task, long triggerTimestamp) {
+		long duration = ThreadLocalRandom.current().nextInt(128);
+
+		assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+		assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+		assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+		assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+		assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+		assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+
+		TaskStateStats.TaskStateStatsSummary summary = task.getSummaryStats();
+		verifyMinMaxAvgStats(summary.getStateSizeStats(), taskNode.get("summary").get("state_size"));
+		verifyMinMaxAvgStats(summary.getSyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("sync"));
+		verifyMinMaxAvgStats(summary.getAsyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("async"));
+		verifyMinMaxAvgStats(summary.getAlignmentBufferedStats(), taskNode.get("summary").get("alignment").get("buffered"));
+		verifyMinMaxAvgStats(summary.getAlignmentDurationStats(), taskNode.get("summary").get("alignment").get("duration"));
+
+		JsonNode endToEndDurationNode = taskNode.get("summary").get("end_to_end_duration");
+		assertEquals(summary.getAckTimestampStats().getMinimum() - triggerTimestamp, endToEndDurationNode.get("min").asLong());
+		assertEquals(summary.getAckTimestampStats().getMaximum() - triggerTimestamp, endToEndDurationNode.get("max").asLong());
+		assertEquals((long) summary.getAckTimestampStats().getAverage() - triggerTimestamp, endToEndDurationNode.get("avg").asLong());
+
+		SubtaskStateStats[] subtasks = task.getSubtaskStats();
+		Iterator<JsonNode> it = taskNode.get("subtasks").iterator();
+
+		assertTrue(it.hasNext());
+		verifySubtaskStats(it.next(), 0, subtasks[0]);
+
+		assertTrue(it.hasNext());
+		verifySubtaskStats(it.next(), 1, subtasks[1]);
+
+		assertTrue(it.hasNext());
+		verifySubtaskStats(it.next(), 2, subtasks[2]);
+
+		assertFalse(it.hasNext());
+	}
+
+	private static SubtaskStateStats createSubtaskStats(int index, ThreadLocalRandom rand) {
+		SubtaskStateStats subtask = mock(SubtaskStateStats.class);
+		when(subtask.getSubtaskIndex()).thenReturn(index);
+		when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
+		when(subtask.getAlignmentBuffered()).thenReturn(rand.nextLong(1024));
+		when(subtask.getAlignmentDuration()).thenReturn(rand.nextLong(1024));
+		when(subtask.getSyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
+		when(subtask.getAsyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
+		when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
+		when(subtask.getStateSize()).thenReturn(rand.nextLong(1024));
+		when(subtask.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024));
+		return subtask;
+	}
+
+	private static void verifySubtaskStats(JsonNode subtaskNode, int index, SubtaskStateStats subtask) {
+		if (subtask == null) {
+			assertEquals(index, subtaskNode.get("index").asInt());
+			assertEquals("pending_or_failed", subtaskNode.get("status").asText());
+		} else {
+			assertEquals(subtask.getSubtaskIndex(), subtaskNode.get("index").asInt());
+			assertEquals("completed", subtaskNode.get("status").asText());
+			assertEquals(subtask.getAckTimestamp(), subtaskNode.get("ack_timestamp").asLong());
+			assertEquals(subtask.getEndToEndDuration(0), subtaskNode.get("end_to_end_duration").asLong());
+			assertEquals(subtask.getStateSize(), subtaskNode.get("state_size").asLong());
+			assertEquals(subtask.getSyncCheckpointDuration(), subtaskNode.get("checkpoint").get("sync").asLong());
+			assertEquals(subtask.getAsyncCheckpointDuration(), subtaskNode.get("checkpoint").get("async").asLong());
+			assertEquals(subtask.getAlignmentBuffered(), subtaskNode.get("alignment").get("buffered").asLong());
+			assertEquals(subtask.getAlignmentDuration(), subtaskNode.get("alignment").get("duration").asLong());
+		}
+	}
+
+	private static MinMaxAvgStats createMinMaxAvgStats(ThreadLocalRandom rand) {
+		MinMaxAvgStats mma = mock(MinMaxAvgStats.class);
+		when(mma.getMinimum()).thenReturn(rand.nextLong(1024));
+		when(mma.getMaximum()).thenReturn(rand.nextLong(1024));
+		when(mma.getAverage()).thenReturn(rand.nextLong(1024));
+
+		return mma;
+	}
+
+	private static void verifyMinMaxAvgStats(MinMaxAvgStats expected, JsonNode node) {
+		assertEquals(expected.getMinimum(), node.get("min").asLong());
+		assertEquals(expected.getMaximum(), node.get("max").asLong());
+		assertEquals(expected.getAverage(), node.get("avg").asLong());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/files/MimeTypesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/files/MimeTypesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/files/MimeTypesTest.java
new file mode 100644
index 0000000..8e5ea17
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/files/MimeTypesTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the MIME types map.
+ */
+public class MimeTypesTest {
+
+	@Test
+	public void testCompleteness() {
+		try {
+			assertNotNull(MimeTypes.getMimeTypeForExtension("txt"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("htm"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("html"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("css"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("js"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("json"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("png"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("jpg"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("jpeg"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("gif"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("woff"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("woff2"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("otf"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("ttf"));
+			assertNotNull(MimeTypes.getMimeTypeForExtension("eot"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFileNameExtraction() {
+		try {
+			assertNotNull(MimeTypes.getMimeTypeForFileName("test.txt"));
+			assertNotNull(MimeTypes.getMimeTypeForFileName("t.txt"));
+			assertNotNull(MimeTypes.getMimeTypeForFileName("first.second.third.txt"));
+
+			assertNull(MimeTypes.getMimeTypeForFileName(".txt"));
+			assertNull(MimeTypes.getMimeTypeForFileName("txt"));
+			assertNull(MimeTypes.getMimeTypeForFileName("test."));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
new file mode 100644
index 0000000..7cbbdc1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandlerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the AbstractMetricsHandler.
+ */
+public class AbstractMetricsHandlerTest extends TestLogger {
+	/**
+	 * Verifies that the handlers correctly handle expected REST calls.
+	 */
+	@Test
+	public void testHandleRequest() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		// get list of available metrics
+		String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\"}," +
+				"{\"id\":\"8.abc.metric4\"}" +
+				"]",
+			availableList);
+
+		// get value for a single metric
+		queryParams.put("get", "8.opname.abc.metric5");
+
+		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
+				"]"
+			, metricValue
+		);
+
+		// get values for multiple metrics
+		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
+
+		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
+				"{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
+				"]",
+			metricValues
+		);
+	}
+
+	/**
+	 * Verifies that a malformed request for available metrics does not throw an exception.
+	 */
+	@Test
+	public void testInvalidListDoesNotFail() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		//-----invalid variable
+		pathParams.put("jobid", "nonexistent");
+
+		try {
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
+		} catch (Exception e) {
+			fail();
+		}
+	}
+
+	/**
+	 * Verifies that a malformed request for a metric value does not throw an exception.
+	 */
+	@Test
+	public void testInvalidGetDoesNotFail() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		//-----empty string
+		queryParams.put("get", "");
+
+		try {
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		//-----invalid variable
+		pathParams.put("jobid", "nonexistent");
+		queryParams.put("get", "subindex.opname.abc.metric5");
+
+		try {
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		//-----invalid metric
+		pathParams.put("jobid", "nonexistant");
+		queryParams.put("get", "subindex.opname.abc.nonexistant");
+
+		try {
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..95ac271
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandlerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the JobManagerMetricsHandler.
+ */
+public class JobManagerMetricsHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobmanager/metrics", paths[0]);
+	}
+
+	@Test
+	public void getMapFor() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals("0", metrics.get("abc.metric1"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = fetcher.getMetricStore();
+
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertNotNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
new file mode 100644
index 0000000..4b28e65
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the JobMetricsHandler.
+ */
+public class JobMetricsHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
+	}
+
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(PARAMETER_JOB_ID, "jobid");
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals("2", metrics.get("abc.metric3"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = fetcher.getMetricStore();
+
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandlerTest.java
new file mode 100644
index 0000000..c1304c4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandlerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the JobVertexMetricsHandler.
+ */
+public class JobVertexMetricsHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
+	}
+
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(PARAMETER_JOB_ID, "jobid");
+		pathParams.put(PARAMETER_VERTEX_ID, "taskid");
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals("3", metrics.get("8.abc.metric4"));
+
+		assertEquals("4", metrics.get("8.opname.abc.metric5"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = fetcher.getMetricStore();
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
new file mode 100644
index 0000000..b278979
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the MetricFetcher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(MetricFetcher.class)
+public class MetricFetcherTest extends TestLogger {
+	@Test
+	public void testUpdate() throws Exception {
+		final Time timeout = Time.seconds(10L);
+
+		// ========= setup TaskManager =================================================================================
+		JobID jobID = new JobID();
+		InstanceID tmID = new InstanceID();
+		ResourceID tmRID = new ResourceID(tmID.toString());
+		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+		Instance taskManager = mock(Instance.class);
+		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+		when(taskManager.getId()).thenReturn(tmID);
+		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+
+		// ========= setup JobManager ==================================================================================
+		JobDetails details = mock(JobDetails.class);
+		when(details.getJobId()).thenReturn(jobID);
+
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+
+		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+		when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
+		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+
+		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
+		when(retriever.getNow())
+			.thenReturn(Optional.of(jobManagerGateway));
+
+		// ========= setup QueryServices ================================================================================
+		MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
+		MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+
+		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+
+		when(jmQueryService.queryMetrics(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+		when(tmQueryService.queryMetrics(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
+
+		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
+		when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+		when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+
+		// ========= start MetricFetcher testing =======================================================================
+		MetricFetcher fetcher = new MetricFetcher(
+			retriever,
+			queryServiceRetriever,
+			Executors.directExecutor(),
+			timeout);
+
+		// verify that update fetches metrics and updates the store
+		fetcher.update();
+		MetricStore store = fetcher.getMetricStore();
+		synchronized (store) {
+			assertEquals("7", store.jobManager.metrics.get("abc.hist_min"));
+			assertEquals("6", store.jobManager.metrics.get("abc.hist_max"));
+			assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean"));
+			assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median"));
+			assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev"));
+			assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75"));
+			assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90"));
+			assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95"));
+			assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98"));
+			assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
+			assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
+
+			assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
+			assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
+			assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
+			assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc"));
+		}
+	}
+
+	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
+		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+		Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+		SimpleCounter c1 = new SimpleCounter();
+		SimpleCounter c2 = new SimpleCounter();
+
+		c1.inc(1);
+		c2.inc(2);
+
+		counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc"));
+		counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
+		meters.put(new Meter() {
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
+		gauges.put(new Gauge<String>() {
+			@Override
+			public String getValue() {
+				return "x";
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
+		histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
+
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+		MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
+		serializer.close();
+
+		return dump;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
new file mode 100644
index 0000000..2e83e08
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the MetricStore.
+ */
+public class MetricStoreTest extends TestLogger {
+	@Test
+	public void testAdd() throws IOException {
+		MetricStore store = setupStore(new MetricStore());
+
+		assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
+		assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
+		assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
+		assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1"));
+		assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1"));
+	}
+
+	@Test
+	public void testMalformedNameHandling() {
+		MetricStore store = new MetricStore();
+		//-----verify that no exceptions are thrown
+
+		// null
+		store.add(null);
+		// empty name
+		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("");
+		MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0);
+		store.add(cd);
+
+		//-----verify that no side effects occur
+		assertEquals(0, store.jobManager.metrics.size());
+		assertEquals(0, store.taskManagers.size());
+		assertEquals(0, store.jobs.size());
+	}
+
+	public static MetricStore setupStore(MetricStore store) {
+		QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1);
+
+		QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2);
+
+		QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+		MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3);
+
+		QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+		MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4);
+
+		store.add(cd1);
+		store.add(cd2);
+		store.add(cd3);
+		store.add(cd4);
+		store.add(cd5);
+
+		return store;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..c6e8f07
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandlerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler.TASK_MANAGER_ID_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Tests for the TaskManagerMetricsHandler.
+ */
+public class TaskManagerMetricsHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
+	}
+
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals("1", metrics.get("abc.metric2"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		MetricStore store = fetcher.getMetricStore();
+
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
new file mode 100644
index 0000000..ad5cd6b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Utility class for constructing an ArchivedExecution.
+ */
+public class ArchivedExecutionBuilder {
+	private ExecutionAttemptID attemptId;
+	private long[] stateTimestamps;
+	private int attemptNumber;
+	private ExecutionState state;
+	private String failureCause;
+	private TaskManagerLocation assignedResourceLocation;
+	private StringifiedAccumulatorResult[] userAccumulators;
+	private IOMetrics ioMetrics;
+	private int parallelSubtaskIndex;
+
+	public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) {
+		this.attemptId = attemptId;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) {
+		Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length);
+		this.stateTimestamps = stateTimestamps;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
+		this.attemptNumber = attemptNumber;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setState(ExecutionState state) {
+		this.state = state;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setFailureCause(String failureCause) {
+		this.failureCause = failureCause;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) {
+		this.assignedResourceLocation = assignedResourceLocation;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
+		this.userAccumulators = userAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) {
+		this.parallelSubtaskIndex = parallelSubtaskIndex;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
+		this.ioMetrics = ioMetrics;
+		return this;
+	}
+
+	public ArchivedExecution build() throws UnknownHostException {
+		return new ArchivedExecution(
+			userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0],
+			ioMetrics != null ? ioMetrics : new TestIOMetrics(),
+			attemptId != null ? attemptId : new ExecutionAttemptID(),
+			attemptNumber,
+			state != null ? state : ExecutionState.FINISHED,
+			failureCause != null ? failureCause : "(null)",
+			assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234),
+			parallelSubtaskIndex,
+			stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5}
+		);
+	}
+
+	private static class TestIOMetrics extends IOMetrics {
+		private static final long serialVersionUID = -5920076211680012555L;
+
+		public TestIOMetrics() {
+			super(
+				new MeterView(new TestCounter(1), 0),
+				new MeterView(new TestCounter(2), 0),
+				new MeterView(new TestCounter(3), 0),
+				new MeterView(new TestCounter(4), 0),
+				new MeterView(new TestCounter(5), 0));
+		}
+	}
+
+	private static class TestCounter implements Counter {
+		private final long count;
+
+		private TestCounter(long count) {
+			this.count = count;
+		}
+
+		@Override
+		public void inc() {
+		}
+
+		@Override
+		public void inc(long n) {
+		}
+
+		@Override
+		public void dec() {
+		}
+
+		@Override
+		public void dec(long n) {
+		}
+
+		@Override
+		public long getCount() {
+			return count;
+		}
+	}
+}