You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:19 UTC

[09/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
deleted file mode 100644
index 9c5e168..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointStatsSubtaskDetailsHandler.
- */
-public class CheckpointStatsSubtaskDetailsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
-		ObjectMapper mapper = new ObjectMapper();
-
-		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
-		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
-		TaskStateStats task = createTaskStateStats(1237);
-		when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
-
-		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
-		when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
-		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
-		when(snapshot.getHistory()).thenReturn(history);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-		when(graph.getJobID()).thenReturn(new JobID());
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals(
-			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
-			archive.getPath());
-		JsonNode rootNode = mapper.readTree(archive.getJson());
-		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
-		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-
-		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
-	}
-
-	@Test
-	public void testGetPaths() {
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
-	}
-
-	/**
-	 * Tests a subtask details request.
-	 */
-	@Test
-	public void testSubtaskRequest() throws Exception {
-		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
-		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
-		TaskStateStats task = createTaskStateStats(1237);
-		when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
-		JsonNode rootNode = triggerRequest(checkpoint);
-		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
-		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-
-		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
-	}
-
-	/**
-	 * Tests a subtask details request.
-	 */
-	@Test
-	public void testSubtaskRequestNoSummary() throws Exception {
-		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
-		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
-		TaskStateStats task = createTaskStateStats(0); // no acknowledged
-		when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
-		JsonNode rootNode = triggerRequest(checkpoint);
-		assertNull(rootNode.get("summary"));
-	}
-
-	/**
-	 * Tests request with illegal checkpoint ID param.
-	 */
-	@Test
-	public void testIllegalCheckpointId() throws Exception {
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "illegal checkpoint");
-		String json = handler.handleRequest(graph, params).get();
-
-		assertEquals("{}", json);
-	}
-
-	/**
-	 * Tests request with missing checkpoint ID param.
-	 */
-	@Test
-	public void testNoCheckpointIdParam() throws Exception {
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
-
-		assertEquals("{}", json);
-	}
-
-	/**
-	 * Test lookup of not existing checkpoint in history.
-	 */
-	@Test
-	public void testCheckpointNotFound() throws Exception {
-		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
-		when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
-
-		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
-		when(snapshot.getHistory()).thenReturn(history);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "123");
-		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params).get();
-
-		assertEquals("{}", json);
-		verify(history, times(1)).getCheckpointById(anyLong());
-	}
-
-	/**
-	 * Tests request with illegal job vertex ID param.
-	 */
-	@Test
-	public void testIllegalJobVertexIdParam() throws Exception {
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "1");
-		params.put("vertexid", "illegal vertex id");
-		String json = handler.handleRequest(graph, params).get();
-
-		assertEquals("{}", json);
-	}
-
-	/**
-	 * Tests request with missing job vertex ID param.
-	 */
-	@Test
-	public void testNoJobVertexIdParam() throws Exception {
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "1");
-		String json = handler.handleRequest(graph, params).get();
-
-		assertEquals("{}", json);
-	}
-
-	/**
-	 * Test lookup of not existing job vertex ID in checkpoint.
-	 */
-	@Test
-	public void testJobVertexNotFound() throws Exception {
-		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
-		when(inProgress.getTaskStateStats(any(JobVertexID.class))).thenReturn(null); // not found
-		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
-		when(history.getCheckpointById(anyLong())).thenReturn(inProgress);
-
-		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
-		when(snapshot.getHistory()).thenReturn(history);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "123");
-		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params).get();
-
-		assertEquals("{}", json);
-		verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
-		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
-		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
-		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
-		when(snapshot.getHistory()).thenReturn(history);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
-		Map<String, String> params = new HashMap<>();
-		params.put("checkpointid", "123");
-		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params).get();
-
-		ObjectMapper mapper = new ObjectMapper();
-		return mapper.readTree(json);
-	}
-
-	private static TaskStateStats createTaskStateStats(int numAcknowledged) {
-		ThreadLocalRandom rand = ThreadLocalRandom.current();
-
-		TaskStateStats task = mock(TaskStateStats.class);
-		when(task.getJobVertexId()).thenReturn(new JobVertexID());
-		when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
-		when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
-		when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
-		when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
-		when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
-		when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(numAcknowledged);
-
-		TaskStateStats.TaskStateStatsSummary summary = mock(TaskStateStats.TaskStateStatsSummary.class);
-
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getStateSizeStats();
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getAckTimestampStats();
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentBufferedStats();
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentDurationStats();
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getSyncCheckpointDurationStats();
-		doReturn(createMinMaxAvgStats(rand)).when(summary).getAsyncCheckpointDurationStats();
-
-		when(task.getSummaryStats()).thenReturn(summary);
-
-		SubtaskStateStats[] subtasks = new SubtaskStateStats[3];
-		subtasks[0] = createSubtaskStats(0, rand);
-		subtasks[1] = createSubtaskStats(1, rand);
-		subtasks[2] = null;
-
-		when(task.getSubtaskStats()).thenReturn(subtasks);
-
-		return task;
-	}
-
-	private static void verifyTaskNode(JsonNode taskNode, TaskStateStats task, long triggerTimestamp) {
-		long duration = ThreadLocalRandom.current().nextInt(128);
-
-		assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
-		assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
-		assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
-		assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
-		assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
-		assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
-
-		TaskStateStats.TaskStateStatsSummary summary = task.getSummaryStats();
-		verifyMinMaxAvgStats(summary.getStateSizeStats(), taskNode.get("summary").get("state_size"));
-		verifyMinMaxAvgStats(summary.getSyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("sync"));
-		verifyMinMaxAvgStats(summary.getAsyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("async"));
-		verifyMinMaxAvgStats(summary.getAlignmentBufferedStats(), taskNode.get("summary").get("alignment").get("buffered"));
-		verifyMinMaxAvgStats(summary.getAlignmentDurationStats(), taskNode.get("summary").get("alignment").get("duration"));
-
-		JsonNode endToEndDurationNode = taskNode.get("summary").get("end_to_end_duration");
-		assertEquals(summary.getAckTimestampStats().getMinimum() - triggerTimestamp, endToEndDurationNode.get("min").asLong());
-		assertEquals(summary.getAckTimestampStats().getMaximum() - triggerTimestamp, endToEndDurationNode.get("max").asLong());
-		assertEquals((long) summary.getAckTimestampStats().getAverage() - triggerTimestamp, endToEndDurationNode.get("avg").asLong());
-
-		SubtaskStateStats[] subtasks = task.getSubtaskStats();
-		Iterator<JsonNode> it = taskNode.get("subtasks").iterator();
-
-		assertTrue(it.hasNext());
-		verifySubtaskStats(it.next(), 0, subtasks[0]);
-
-		assertTrue(it.hasNext());
-		verifySubtaskStats(it.next(), 1, subtasks[1]);
-
-		assertTrue(it.hasNext());
-		verifySubtaskStats(it.next(), 2, subtasks[2]);
-
-		assertFalse(it.hasNext());
-	}
-
-	private static SubtaskStateStats createSubtaskStats(int index, ThreadLocalRandom rand) {
-		SubtaskStateStats subtask = mock(SubtaskStateStats.class);
-		when(subtask.getSubtaskIndex()).thenReturn(index);
-		when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
-		when(subtask.getAlignmentBuffered()).thenReturn(rand.nextLong(1024));
-		when(subtask.getAlignmentDuration()).thenReturn(rand.nextLong(1024));
-		when(subtask.getSyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
-		when(subtask.getAsyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
-		when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
-		when(subtask.getStateSize()).thenReturn(rand.nextLong(1024));
-		when(subtask.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024));
-		return subtask;
-	}
-
-	private static void verifySubtaskStats(JsonNode subtaskNode, int index, SubtaskStateStats subtask) {
-		if (subtask == null) {
-			assertEquals(index, subtaskNode.get("index").asInt());
-			assertEquals("pending_or_failed", subtaskNode.get("status").asText());
-		} else {
-			assertEquals(subtask.getSubtaskIndex(), subtaskNode.get("index").asInt());
-			assertEquals("completed", subtaskNode.get("status").asText());
-			assertEquals(subtask.getAckTimestamp(), subtaskNode.get("ack_timestamp").asLong());
-			assertEquals(subtask.getEndToEndDuration(0), subtaskNode.get("end_to_end_duration").asLong());
-			assertEquals(subtask.getStateSize(), subtaskNode.get("state_size").asLong());
-			assertEquals(subtask.getSyncCheckpointDuration(), subtaskNode.get("checkpoint").get("sync").asLong());
-			assertEquals(subtask.getAsyncCheckpointDuration(), subtaskNode.get("checkpoint").get("async").asLong());
-			assertEquals(subtask.getAlignmentBuffered(), subtaskNode.get("alignment").get("buffered").asLong());
-			assertEquals(subtask.getAlignmentDuration(), subtaskNode.get("alignment").get("duration").asLong());
-		}
-	}
-
-	private static MinMaxAvgStats createMinMaxAvgStats(ThreadLocalRandom rand) {
-		MinMaxAvgStats mma = mock(MinMaxAvgStats.class);
-		when(mma.getMinimum()).thenReturn(rand.nextLong(1024));
-		when(mma.getMaximum()).thenReturn(rand.nextLong(1024));
-		when(mma.getAverage()).thenReturn(rand.nextLong(1024));
-
-		return mma;
-	}
-
-	private static void verifyMinMaxAvgStats(MinMaxAvgStats expected, JsonNode node) {
-		assertEquals(expected.getMinimum(), node.get("min").asLong());
-		assertEquals(expected.getMaximum(), node.get("max").asLong());
-		assertEquals(expected.getAverage(), node.get("avg").asLong());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
index 2e52f2e..03666a8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 
 import org.junit.Assert;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 33d9c79..3c93be3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
deleted file mode 100644
index 0755888..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the AbstractMetricsHandler.
- */
-public class AbstractMetricsHandlerTest extends TestLogger {
-	/**
-	 * Verifies that the handlers correctly handle expected REST calls.
-	 */
-	@Test
-	public void testHandleRequest() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		Map<String, String> queryParams = new HashMap<>();
-
-		pathParams.put("jobid", "jobid");
-		pathParams.put("vertexid", "taskid");
-
-		// get list of available metrics
-		String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
-		assertEquals("[" +
-				"{\"id\":\"8.opname.abc.metric5\"}," +
-				"{\"id\":\"8.abc.metric4\"}" +
-				"]",
-			availableList);
-
-		// get value for a single metric
-		queryParams.put("get", "8.opname.abc.metric5");
-
-		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
-		assertEquals("[" +
-				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
-				"]"
-			, metricValue
-		);
-
-		// get values for multiple metrics
-		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
-
-		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
-		assertEquals("[" +
-				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
-				"{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
-				"]",
-			metricValues
-		);
-	}
-
-	/**
-	 * Verifies that a malformed request for available metrics does not throw an exception.
-	 */
-	@Test
-	public void testInvalidListDoesNotFail() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		Map<String, String> queryParams = new HashMap<>();
-
-		pathParams.put("jobid", "jobid");
-		pathParams.put("vertexid", "taskid");
-
-		//-----invalid variable
-		pathParams.put("jobid", "nonexistent");
-
-		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
-		} catch (Exception e) {
-			fail();
-		}
-	}
-
-	/**
-	 * Verifies that a malformed request for a metric value does not throw an exception.
-	 */
-	@Test
-	public void testInvalidGetDoesNotFail() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		Map<String, String> queryParams = new HashMap<>();
-
-		pathParams.put("jobid", "jobid");
-		pathParams.put("vertexid", "taskid");
-
-		//-----empty string
-		queryParams.put("get", "");
-
-		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-
-		//-----invalid variable
-		pathParams.put("jobid", "nonexistent");
-		queryParams.put("get", "subindex.opname.abc.metric5");
-
-		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-
-		//-----invalid metric
-		pathParams.put("jobid", "nonexistant");
-		queryParams.put("get", "subindex.opname.abc.nonexistant");
-
-		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
deleted file mode 100644
index 6d17b40..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobManagerMetricsHandler.
- */
-public class JobManagerMetricsHandlerTest extends TestLogger {
-	@Test
-	public void testGetPaths() {
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobmanager/metrics", paths[0]);
-	}
-
-	@Test
-	public void getMapFor() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertEquals("0", metrics.get("abc.metric1"));
-	}
-
-	@Test
-	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = fetcher.getMetricStore();
-
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertNotNull(metrics);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
deleted file mode 100644
index b26ceab..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobMetricsHandler.
- */
-public class JobMetricsHandlerTest extends TestLogger {
-	@Test
-	public void testGetPaths() {
-		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
-	}
-
-	@Test
-	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		pathParams.put(PARAMETER_JOB_ID, "jobid");
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertEquals("2", metrics.get("abc.metric3"));
-	}
-
-	@Test
-	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = fetcher.getMetricStore();
-
-		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertNull(metrics);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
deleted file mode 100644
index d637a4a..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobVertexMetricsHandler.
- */
-public class JobVertexMetricsHandlerTest extends TestLogger {
-	@Test
-	public void testGetPaths() {
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
-	}
-
-	@Test
-	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		pathParams.put(PARAMETER_JOB_ID, "jobid");
-		pathParams.put(PARAMETER_VERTEX_ID, "taskid");
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertEquals("3", metrics.get("8.abc.metric4"));
-
-		assertEquals("4", metrics.get("8.opname.abc.metric5"));
-	}
-
-	@Test
-	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = fetcher.getMetricStore();
-
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertNull(metrics);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
deleted file mode 100644
index 09a0829..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for the MetricFetcher.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
-public class MetricFetcherTest extends TestLogger {
-	@Test
-	public void testUpdate() throws Exception {
-		final Time timeout = Time.seconds(10L);
-
-		// ========= setup TaskManager =================================================================================
-		JobID jobID = new JobID();
-		InstanceID tmID = new InstanceID();
-		ResourceID tmRID = new ResourceID(tmID.toString());
-		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
-		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
-		Instance taskManager = mock(Instance.class);
-		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-		when(taskManager.getId()).thenReturn(tmID);
-		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
-
-		// ========= setup JobManager ==================================================================================
-		JobDetails details = mock(JobDetails.class);
-		when(details.getJobId()).thenReturn(jobID);
-
-		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
-
-		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
-		when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
-		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
-
-		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
-		when(retriever.getNow())
-			.thenReturn(Optional.of(jobManagerGateway));
-
-		// ========= setup QueryServices ================================================================================
-		MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
-		MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
-
-		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
-
-		when(jmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
-		when(tmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
-		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
-		when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-		when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
-
-		// ========= start MetricFetcher testing =======================================================================
-		MetricFetcher fetcher = new MetricFetcher(
-			retriever,
-			queryServiceRetriever,
-			Executors.directExecutor(),
-			timeout);
-
-		// verify that update fetches metrics and updates the store
-		fetcher.update();
-		MetricStore store = fetcher.getMetricStore();
-		synchronized (store) {
-			assertEquals("7", store.jobManager.metrics.get("abc.hist_min"));
-			assertEquals("6", store.jobManager.metrics.get("abc.hist_max"));
-			assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean"));
-			assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median"));
-			assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev"));
-			assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75"));
-			assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90"));
-			assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95"));
-			assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98"));
-			assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
-			assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
-
-			assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
-			assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
-			assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
-			assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc"));
-		}
-	}
-
-	private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
-		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
-		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
-		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
-		Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
-
-		SimpleCounter c1 = new SimpleCounter();
-		SimpleCounter c2 = new SimpleCounter();
-
-		c1.inc(1);
-		c2.inc(2);
-
-		counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc"));
-		counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
-		meters.put(new Meter() {
-			@Override
-			public void markEvent() {
-			}
-
-			@Override
-			public void markEvent(long n) {
-			}
-
-			@Override
-			public double getRate() {
-				return 5;
-			}
-
-			@Override
-			public long getCount() {
-				return 10;
-			}
-		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
-		gauges.put(new Gauge<String>() {
-			@Override
-			public String getValue() {
-				return "x";
-			}
-		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
-		histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
-
-		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
-		MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
-		serializer.close();
-
-		return dump;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
deleted file mode 100644
index d19e8c6..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the MetricStore.
- */
-public class MetricStoreTest extends TestLogger {
-	@Test
-	public void testAdd() throws IOException {
-		MetricStore store = setupStore(new MetricStore());
-
-		assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
-		assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
-		assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
-		assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1"));
-		assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1"));
-	}
-
-	@Test
-	public void testMalformedNameHandling() {
-		MetricStore store = new MetricStore();
-		//-----verify that no exceptions are thrown
-
-		// null
-		store.add(null);
-		// empty name
-		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("");
-		MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0);
-		store.add(cd);
-
-		//-----verify that no side effects occur
-		assertEquals(0, store.jobManager.metrics.size());
-		assertEquals(0, store.taskManagers.size());
-		assertEquals(0, store.jobs.size());
-	}
-
-	public static MetricStore setupStore(MetricStore store) {
-		QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
-		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
-
-		QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
-		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1);
-
-		QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
-		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2);
-
-		QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
-		MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3);
-
-		QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
-		MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4);
-
-		store.add(cd1);
-		store.add(cd2);
-		store.add(cd3);
-		store.add(cd4);
-		store.add(cd5);
-
-		return store;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
deleted file mode 100644
index 9c5549e..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the TaskManagerMetricsHandler.
- */
-public class TaskManagerMetricsHandlerTest extends TestLogger {
-	@Test
-	public void testGetPaths() {
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
-	}
-
-	@Test
-	public void getMapFor() throws Exception {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-		pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertEquals("1", metrics.get("abc.metric2"));
-	}
-
-	@Test
-	public void getMapForNull() {
-		MetricFetcher fetcher = new MetricFetcher(
-			mock(GatewayRetriever.class),
-			mock(MetricQueryServiceRetriever.class),
-			Executors.directExecutor(),
-			TestingUtils.TIMEOUT());
-		MetricStore store = fetcher.getMetricStore();
-
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
-		Map<String, String> pathParams = new HashMap<>();
-
-		Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
-		assertNull(metrics);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
deleted file mode 100644
index 979d943..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Utility class for constructing an ArchivedExecution.
- */
-public class ArchivedExecutionBuilder {
-	private ExecutionAttemptID attemptId;
-	private long[] stateTimestamps;
-	private int attemptNumber;
-	private ExecutionState state;
-	private String failureCause;
-	private TaskManagerLocation assignedResourceLocation;
-	private StringifiedAccumulatorResult[] userAccumulators;
-	private IOMetrics ioMetrics;
-	private int parallelSubtaskIndex;
-
-	public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) {
-		this.attemptId = attemptId;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) {
-		Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length);
-		this.stateTimestamps = stateTimestamps;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
-		this.attemptNumber = attemptNumber;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setState(ExecutionState state) {
-		this.state = state;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setFailureCause(String failureCause) {
-		this.failureCause = failureCause;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) {
-		this.assignedResourceLocation = assignedResourceLocation;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
-		this.userAccumulators = userAccumulators;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) {
-		this.parallelSubtaskIndex = parallelSubtaskIndex;
-		return this;
-	}
-
-	public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
-		this.ioMetrics = ioMetrics;
-		return this;
-	}
-
-	public ArchivedExecution build() throws UnknownHostException {
-		return new ArchivedExecution(
-			userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0],
-			ioMetrics != null ? ioMetrics : new TestIOMetrics(),
-			attemptId != null ? attemptId : new ExecutionAttemptID(),
-			attemptNumber,
-			state != null ? state : ExecutionState.FINISHED,
-			failureCause != null ? failureCause : "(null)",
-			assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234),
-			parallelSubtaskIndex,
-			stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5}
-		);
-	}
-
-	private static class TestIOMetrics extends IOMetrics {
-		private static final long serialVersionUID = -5920076211680012555L;
-
-		public TestIOMetrics() {
-			super(
-				new MeterView(new TestCounter(1), 0),
-				new MeterView(new TestCounter(2), 0),
-				new MeterView(new TestCounter(3), 0),
-				new MeterView(new TestCounter(4), 0),
-				new MeterView(new TestCounter(5), 0));
-		}
-	}
-
-	private static class TestCounter implements Counter {
-		private final long count;
-
-		private TestCounter(long count) {
-			this.count = count;
-		}
-
-		@Override
-		public void inc() {
-		}
-
-		@Override
-		public void inc(long n) {
-		}
-
-		@Override
-		public void dec() {
-		}
-
-		@Override
-		public void dec(long n) {
-		}
-
-		@Override
-		public long getCount() {
-			return count;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
deleted file mode 100644
index 053f718..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionMode;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Utility class for constructing an ArchivedExecutionConfig.
- */
-public class ArchivedExecutionConfigBuilder {
-	private String executionMode;
-	private String restartStrategyDescription;
-	private int parallelism;
-	private boolean objectReuseEnabled;
-	private Map<String, String> globalJobParameters;
-
-	public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) {
-		this.executionMode = executionMode;
-		return this;
-	}
-
-	public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) {
-		this.restartStrategyDescription = restartStrategyDescription;
-		return this;
-	}
-
-	public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		return this;
-	}
-
-	public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) {
-		this.objectReuseEnabled = objectReuseEnabled;
-		return this;
-	}
-
-	public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) {
-		this.globalJobParameters = globalJobParameters;
-		return this;
-	}
-
-	public ArchivedExecutionConfig build() {
-		return new ArchivedExecutionConfig(
-			executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(),
-			restartStrategyDescription != null ? restartStrategyDescription : "default",
-			parallelism,
-			objectReuseEnabled,
-			globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap()
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
deleted file mode 100644
index 57b300a..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionGraph.
- */
-public class ArchivedExecutionGraphBuilder {
-
-	private static final Random RANDOM = new Random();
-
-	private JobID jobID;
-	private String jobName;
-	private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
-	private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
-	private long[] stateTimestamps;
-	private JobStatus state;
-	private ErrorInfo failureCause;
-	private String jsonPlan;
-	private StringifiedAccumulatorResult[] archivedUserAccumulators;
-	private ArchivedExecutionConfig archivedExecutionConfig;
-	private boolean isStoppable;
-	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
-
-	public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
-		this.jobID = jobID;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setJobName(String jobName) {
-		this.jobName = jobName;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex> tasks) {
-		this.tasks = tasks;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> verticesInCreationOrder) {
-		this.verticesInCreationOrder = verticesInCreationOrder;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) {
-		Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length);
-		this.stateTimestamps = stateTimestamps;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setState(JobStatus state) {
-		this.state = state;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) {
-		this.failureCause = failureCause;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
-		this.jsonPlan = jsonPlan;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) {
-		this.archivedUserAccumulators = archivedUserAccumulators;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) {
-		this.archivedExecutionConfig = archivedExecutionConfig;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
-		isStoppable = stoppable;
-		return this;
-	}
-
-	public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>> serializedUserAccumulators) {
-		this.serializedUserAccumulators = serializedUserAccumulators;
-		return this;
-	}
-
-	public ArchivedExecutionGraph build() {
-		Preconditions.checkNotNull(tasks, "Tasks must not be null.");
-		JobID jobID = this.jobID != null ? this.jobID : new JobID();
-		String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt();
-		return new ArchivedExecutionGraph(
-			jobID,
-			jobName,
-			tasks,
-			verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
-			stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
-			state != null ? state : JobStatus.FINISHED,
-			failureCause,
-			jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
-			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
-			serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(),
-			archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
-			isStoppable,
-			null,
-			null
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
deleted file mode 100644
index 3ef4106..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionJobVertex.
- */
-public class ArchivedExecutionJobVertexBuilder {
-
-	private static final Random RANDOM = new Random();
-
-	private ArchivedExecutionVertex[] taskVertices;
-	private JobVertexID id;
-	private String name;
-	private int parallelism;
-	private int maxParallelism;
-	private StringifiedAccumulatorResult[] archivedUserAccumulators;
-
-	public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices) {
-		this.taskVertices = taskVertices;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
-		this.id = id;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertexBuilder setName(String name) {
-		this.name = name;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) {
-		this.maxParallelism = maxParallelism;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) {
-		this.archivedUserAccumulators = archivedUserAccumulators;
-		return this;
-	}
-
-	public ArchivedExecutionJobVertex build() {
-		Preconditions.checkNotNull(taskVertices);
-		return new ArchivedExecutionJobVertex(
-			taskVertices,
-			id != null ? id : new JobVertexID(),
-			name != null ? name : "task_" + RANDOM.nextInt(),
-			parallelism,
-			maxParallelism,
-			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
deleted file mode 100644
index 67e9e11..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.util.EvictingBoundedList;
-import org.apache.flink.util.Preconditions;
-
-import java.util.List;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionVertex.
- */
-public class ArchivedExecutionVertexBuilder {
-
-	private static final Random RANDOM = new Random();
-
-	private int subtaskIndex;
-	private EvictingBoundedList<ArchivedExecution> priorExecutions;
-	private String taskNameWithSubtask;
-	private ArchivedExecution currentExecution;
-
-	public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) {
-		this.subtaskIndex = subtaskIndex;
-		return this;
-	}
-
-	public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions) {
-		this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size());
-		for (ArchivedExecution execution : priorExecutions) {
-			this.priorExecutions.add(execution);
-		}
-		return this;
-	}
-
-	public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask) {
-		this.taskNameWithSubtask = taskNameWithSubtask;
-		return this;
-	}
-
-	public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution) {
-		this.currentExecution = currentExecution;
-		return this;
-	}
-
-	public ArchivedExecutionVertex build() {
-		Preconditions.checkNotNull(currentExecution);
-		return new ArchivedExecutionVertex(
-			subtaskIndex,
-			taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" + subtaskIndex,
-			currentExecution,
-			priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0)
-		);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
deleted file mode 100644
index 3e4fc01..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Common entry-point for accessing generated ArchivedExecution* components.
- */
-public class ArchivedJobGenerationUtils {
-	public static final ObjectMapper MAPPER = new ObjectMapper();
-	public static final JsonFactory JACKSON_FACTORY = new JsonFactory()
-		.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
-		.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
-
-	private static ArchivedExecutionGraph originalJob;
-	private static ArchivedExecutionJobVertex originalTask;
-	private static ArchivedExecutionVertex originalSubtask;
-	private static ArchivedExecution originalAttempt;
-
-	private static final Object lock = new Object();
-
-	private ArchivedJobGenerationUtils() {
-	}
-
-	public static AccessExecutionGraph getTestJob() throws Exception {
-		synchronized (lock) {
-			if (originalJob == null) {
-				generateArchivedJob();
-			}
-		}
-		return originalJob;
-	}
-
-	public static AccessExecutionJobVertex getTestTask() throws Exception {
-		synchronized (lock) {
-			if (originalJob == null) {
-				generateArchivedJob();
-			}
-		}
-		return originalTask;
-	}
-
-	public static AccessExecutionVertex getTestSubtask() throws Exception {
-		synchronized (lock) {
-			if (originalJob == null) {
-				generateArchivedJob();
-			}
-		}
-		return originalSubtask;
-	}
-
-	public static AccessExecution getTestAttempt() throws Exception {
-		synchronized (lock) {
-			if (originalJob == null) {
-				generateArchivedJob();
-			}
-		}
-		return originalAttempt;
-	}
-
-	private static void generateArchivedJob() throws Exception {
-		// Attempt
-		StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1");
-		StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2");
-		TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234);
-		originalAttempt = new ArchivedExecutionBuilder()
-			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
-			.setParallelSubtaskIndex(1)
-			.setAttemptNumber(0)
-			.setAssignedResourceLocation(location)
-			.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
-			.setState(ExecutionState.FINISHED)
-			.setFailureCause("attemptException")
-			.build();
-		// Subtask
-		originalSubtask = new ArchivedExecutionVertexBuilder()
-			.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
-			.setTaskNameWithSubtask("hello(1/1)")
-			.setCurrentExecution(originalAttempt)
-			.build();
-		// Task
-		originalTask = new ArchivedExecutionJobVertexBuilder()
-			.setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask})
-			.build();
-		// Job
-		Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
-		tasks.put(originalTask.getJobVertexId(), originalTask);
-		originalJob = new ArchivedExecutionGraphBuilder()
-			.setJobID(new JobID())
-			.setTasks(tasks)
-			.setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
-			.setState(JobStatus.FINISHED)
-			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
-			.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
-			.build();
-	}
-
-	// ========================================================================
-	// utility methods
-	// ========================================================================
-
-	public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) {
-		assertEquals(expectedAccs.length, writtenAccs.size());
-		for (int x = 0; x < expectedAccs.length; x++) {
-			JsonNode acc = writtenAccs.get(x);
-
-			assertEquals(expectedAccs[x].getName(), acc.get("name").asText());
-			assertEquals(expectedAccs[x].getType(), acc.get("type").asText());
-			assertEquals(expectedAccs[x].getValue(), acc.get("value").asText());
-		}
-	}
-
-	public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) {
-		assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong());
-		assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong());
-		assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong());
-		assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
new file mode 100644
index 0000000..4ad1759
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+/**
+ * A special exception that indicates that an element was not found and that the
+ * request should be answered with a {@code 404} return code.
+ */
+public class NotFoundException extends Exception {
+
+	private static final long serialVersionUID = -4036006746423754639L;
+
+	public NotFoundException(String message) {
+		super(message);
+	}
+}