You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:14 UTC

[04/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
new file mode 100644
index 0000000..8f21af0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the JobCancellationWithSavepointHandler.
+ */
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
+
+	private static final Executor executor = Executors.directExecutor();
+
+	@Test
+	public void testGetPaths() {
+		JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor);
+
+		JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler();
+		String[] triggerPaths = triggerHandler.getPaths();
+		Assert.assertEquals(2, triggerPaths.length);
+		List<String> triggerPathsList = Arrays.asList(triggerPaths);
+		Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint"));
+		Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"));
+
+		JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler();
+		String[] progressPaths = progressHandler.getPaths();
+		Assert.assertEquals(1, progressPaths.length);
+		Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]);
+	}
+
+	/**
+	 * Tests that the cancellation ask timeout respects the checkpoint timeout.
+	 * Otherwise, AskTimeoutExceptions are bound to happen for large state.
+	 */
+	@Test
+	public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
+		long timeout = 128288238L;
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "placeholder");
+
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+		handler.handleRequest(params, Collections.emptyMap(), jobManager);
+
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class));
+	}
+
+	/**
+	 * Tests that the savepoint directory configuration is respected.
+	 */
+	@Test
+	public void testSavepointDirectoryConfiguration() throws Exception {
+		long timeout = 128288238L;
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+		when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
+		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+		// 1. Use targetDirectory path param
+		params.put("targetDirectory", "custom-directory");
+		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+		// 2. Use default
+		params.remove("targetDirectory");
+
+		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class));
+
+		// 3. Throw Exception
+		handlers = new JobCancellationWithSavepointHandlers(holder, executor, null);
+		handler = handlers.getTriggerHandler();
+
+		try {
+			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+			fail("Did not throw expected test Exception");
+		} catch (Exception e) {
+			IllegalStateException cause = (IllegalStateException) e.getCause();
+			assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
+		}
+	}
+
+	/**
+	 * Tests triggering a new request and monitoring it.
+	 */
+	@Test
+	public void testTriggerNewRequest() throws Exception {
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "custom-directory");
+
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+		// Successful
+		CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>();
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
+
+		// Trigger
+		FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get();
+
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+		String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
+
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+		String json = response.content().toString(Charset.forName("UTF-8"));
+		JsonNode root = new ObjectMapper().readTree(json);
+
+		assertEquals("accepted", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals(location, root.get("location").asText());
+
+		// Trigger again
+		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("accepted", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals(location, root.get("location").asText());
+
+		// Only single actual request
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+		// Query progress
+		params.put("requestId", "1");
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("in-progress", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+
+		// Complete
+		successfulCancelWithSavepoint.complete("_path-savepoint_");
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+
+		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("success", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
+
+		// Query again, keep recent history
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+
+		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("success", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
+
+		// Query for unknown request
+		params.put("requestId", "9929");
+
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+		assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		json = response.content().toString(Charset.forName("UTF-8"));
+
+		root = new ObjectMapper().readTree(json);
+
+		assertEquals("failed", root.get("status").asText());
+		assertEquals("9929", root.get("request-id").asText());
+		assertEquals("Unknown job/request ID", root.get("cause").asText());
+	}
+
+	/**
+	 * Tests response when a request fails.
+	 */
+	@Test
+	public void testFailedCancellation() throws Exception {
+		JobID jobId = new JobID();
+		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+		when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+		Map<String, String> params = new HashMap<>();
+		params.put("jobid", jobId.toString());
+		params.put("targetDirectory", "custom-directory");
+
+		JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+		// Successful
+		CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception"));
+		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
+
+		// Trigger
+		trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+		// Query progress
+		params.put("requestId", "1");
+
+		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+		assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
+		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+		String json = response.content().toString(Charset.forName("UTF-8"));
+		JsonNode root = new ObjectMapper().readTree(json);
+
+		assertEquals("failed", root.get("status").asText());
+		assertEquals("1", root.get("request-id").asText());
+		assertEquals("Test Exception", root.get("cause").asText());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
new file mode 100644
index 0000000..567df8c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobConfigHandler.
+ */
+public class JobConfigHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath());
+		compareJobConfig(originalJob, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/config", paths[0]);
+	}
+
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String answer = JobConfigHandler.createJobConfigJson(originalJob);
+		compareJobConfig(originalJob, answer);
+	}
+
+	private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException {
+		JsonNode job = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+		Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());
+		Assert.assertEquals(originalJob.getJobName(), job.get("name").asText());
+
+		ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig();
+		JsonNode config = job.get("execution-config");
+
+		Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText());
+		Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText());
+		Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt());
+		Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean());
+
+		Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters();
+		JsonNode userConfig = config.get("user-config");
+
+		for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) {
+			Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
new file mode 100644
index 0000000..afd743e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobDetailsHandler.
+ */
+public class JobDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(2, archives.size());
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath());
+		compareJobDetails(originalJob, archive1.getJson());
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath());
+		compareJobDetails(originalJob, archive2.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(2, paths.length);
+		List<String> pathsList = Lists.newArrayList(paths);
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
+
+		compareJobDetails(originalJob, json);
+	}
+
+	private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());
+		Assert.assertEquals(originalJob.getJobName(), result.get("name").asText());
+		Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean());
+		Assert.assertEquals(originalJob.getState().name(), result.get("state").asText());
+
+		Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong());
+		Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong());
+		Assert.assertEquals(
+			originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED),
+			result.get("duration").asLong()
+		);
+
+		JsonNode timestamps = result.get("timestamps");
+		for (JobStatus status : JobStatus.values()) {
+			Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong());
+		}
+
+		ArrayNode tasks = (ArrayNode) result.get("vertices");
+		int x = 0;
+		for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) {
+			JsonNode task = tasks.get(x);
+
+			Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText());
+			Assert.assertEquals(expectedTask.getName(), task.get("name").asText());
+			Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt());
+			Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText());
+
+			Assert.assertEquals(3, task.get("start-time").asLong());
+			Assert.assertEquals(5, task.get("end-time").asLong());
+			Assert.assertEquals(2, task.get("duration").asLong());
+
+			JsonNode subtasksPerState = task.get("tasks");
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
+			Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
+
+			long expectedNumBytesIn = 0;
+			long expectedNumBytesOut = 0;
+			long expectedNumRecordsIn = 0;
+			long expectedNumRecordsOut = 0;
+
+			for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) {
+				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+				expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+				expectedNumBytesOut += ioMetrics.getNumBytesOut();
+				expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+				expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+			}
+
+			JsonNode metrics = task.get("metrics");
+
+			Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+			Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+			Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+			Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+
+			x++;
+		}
+		Assert.assertEquals(1, tasks.size());
+
+		JsonNode statusCounts = result.get("status-counts");
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+		Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+		Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()), result.get("plan"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
new file mode 100644
index 0000000..6a20696
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobExceptionsHandler.
+ */
+public class JobExceptionsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath());
+		compareExceptions(originalJob, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
+
+		compareExceptions(originalJob, json);
+	}
+
+	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
+
+		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
+
+		int x = 0;
+		for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) {
+			if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+				JsonNode exception = exceptions.get(x);
+
+				Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
+				Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong());
+				Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
+
+				TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+				String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort();
+				Assert.assertEquals(expectedLocationString, exception.get("location").asText());
+			}
+			x++;
+		}
+		Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
new file mode 100644
index 0000000..03ddb73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the JobManagerConfigHandler.
+ */
+public class JobManagerConfigHandlerTest {
+	@Test
+	public void testGetPaths() {
+		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobmanager/config", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
new file mode 100644
index 0000000..6d3b213
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobPlanHandler.
+ */
+public class JobPlanHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath());
+		Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
new file mode 100644
index 0000000..2c39fcf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobStoppingHandler.
+ */
+public class JobStoppingHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(2, paths.length);
+		List<String> pathsList = Lists.newArrayList(paths);
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop"));
+		Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..feffe60
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexAccumulatorsHandler.
+ */
+public class JobVertexAccumulatorsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath());
+		compareAccumulators(originalTask, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+
+		compareAccumulators(originalTask, json);
+	}
+
+	private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+
+		ArrayNode accs = (ArrayNode) result.get("user-accumulators");
+		StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified();
+
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
new file mode 100644
index 0000000..bd6817f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for back pressure handler responses.
+ */
+public class JobVertexBackPressureHandlerTest {
+	@Test
+	public void testGetPaths() {
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
+	}
+
+	/** Tests the response when no stats are available. */
+	@Test
+	public void testResponseNoStatsAvailable() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Optional.empty());
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
+				statsTracker,
+				9999);
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(1, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		assertEquals("deprecated", status.textValue());
+
+		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+
+	/** Tests the response when stats are available. */
+	@Test
+	public void testResponseStatsAvailable() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		OperatorBackPressureStats stats = new OperatorBackPressureStats(
+				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Optional.of(stats));
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
+				statsTracker,
+				9999);
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(4, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		assertEquals("ok", status.textValue());
+
+		// Back pressure level
+		JsonNode backPressureLevel = rootNode.get("backpressure-level");
+		assertNotNull(backPressureLevel);
+		assertEquals("high", backPressureLevel.textValue());
+
+		// End time stamp
+		JsonNode endTimeStamp = rootNode.get("end-timestamp");
+		assertNotNull(endTimeStamp);
+		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+		// Subtasks
+		JsonNode subTasks = rootNode.get("subtasks");
+		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+		for (int i = 0; i < subTasks.size(); i++) {
+			JsonNode subTask = subTasks.get(i);
+
+			JsonNode index = subTask.get("subtask");
+			assertEquals(i, index.intValue());
+
+			JsonNode level = subTask.get("backpressure-level");
+			assertEquals(JobVertexBackPressureHandler
+					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+			JsonNode ratio = subTask.get("ratio");
+			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+		}
+
+		// Verify not triggered
+		verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+
+	/** Tests that after the refresh interval another sample is triggered. */
+	@Test
+	public void testResponsePassedRefreshInterval() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		OperatorBackPressureStats stats = new OperatorBackPressureStats(
+				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Optional.of(stats));
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
+				statsTracker,
+				0); // <----- refresh interval should fire immediately
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(4, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		// Interval passed, hence deprecated
+		assertEquals("deprecated", status.textValue());
+
+		// Back pressure level
+		JsonNode backPressureLevel = rootNode.get("backpressure-level");
+		assertNotNull(backPressureLevel);
+		assertEquals("high", backPressureLevel.textValue());
+
+		// End time stamp
+		JsonNode endTimeStamp = rootNode.get("end-timestamp");
+		assertNotNull(endTimeStamp);
+		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+		// Subtasks
+		JsonNode subTasks = rootNode.get("subtasks");
+		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+		for (int i = 0; i < subTasks.size(); i++) {
+			JsonNode subTask = subTasks.get(i);
+
+			JsonNode index = subTask.get("subtask");
+			assertEquals(i, index.intValue());
+
+			JsonNode level = subTask.get("backpressure-level");
+			assertEquals(JobVertexBackPressureHandler
+					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+			JsonNode ratio = subTask.get("ratio");
+			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+		}
+
+		// Verify triggered
+		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
new file mode 100644
index 0000000..5af1d53
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexDetailsHandler.
+ */
+public class JobVertexDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+		compareVertexDetails(originalTask, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = JobVertexDetailsHandler.createVertexDetailsJson(
+			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+		compareVertexDetails(originalTask, json);
+	}
+
+	private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+		Assert.assertTrue(result.get("now").asLong() > 0);
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+			JsonNode subtask = subtasks.get(x);
+
+			Assert.assertEquals(x, subtask.get("subtask").asInt());
+			Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText());
+			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+
+			TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+			String expectedLocationString = location.getHostname() + ":" + location.dataPort();
+			Assert.assertEquals(expectedLocationString, subtask.get("host").asText());
+			long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
+			Assert.assertEquals(start, subtask.get("start-time").asLong());
+			long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
+			Assert.assertEquals(end, subtask.get("end-time").asLong());
+			Assert.assertEquals(end - start, subtask.get("duration").asLong());
+
+			ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
new file mode 100644
index 0000000..2a027fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexTaskManagersHandler.
+ */
+public class JobVertexTaskManagersHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath());
+		compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+		String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
+			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+		compareVertexTaskManagers(originalTask, originalSubtask, json);
+	}
+
+	private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertTrue(result.get("now").asLong() > 0);
+
+		ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
+
+		JsonNode taskManager = taskmanagers.get(0);
+
+		TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation();
+		String expectedLocationString = location.getHostname() + ':' + location.dataPort();
+		Assert.assertEquals(expectedLocationString, taskManager.get("host").asText());
+		Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText());
+
+		Assert.assertEquals(3, taskManager.get("start-time").asLong());
+		Assert.assertEquals(5, taskManager.get("end-time").asLong());
+		Assert.assertEquals(2, taskManager.get("duration").asLong());
+
+		JsonNode statusCounts = taskManager.get("status-counts");
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+		Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+		long expectedNumBytesIn = 0;
+		long expectedNumBytesOut = 0;
+		long expectedNumRecordsIn = 0;
+		long expectedNumRecordsOut = 0;
+
+		for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) {
+			IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+			expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+			expectedNumBytesOut += ioMetrics.getNumBytesOut();
+			expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+			expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+		}
+
+		JsonNode metrics = taskManager.get("metrics");
+
+		Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+		Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+		Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+		Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..9e0d549
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskCurrentAttemptDetailsHandler.
+ */
+public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
+	@Test
+	public void testGetPaths() {
+		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..49e54c0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+			"/vertices/" + originalTask.getJobVertexId() +
+			"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+			"/attempts/" + originalAttempt.getAttemptNumber() +
+			"/accumulators",
+			archive.getPath());
+		compareAttemptAccumulators(originalAttempt, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+
+		compareAttemptAccumulators(originalAttempt, json);
+	}
+
+	private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+		Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText());
+
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..e1fe8b5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptDetailsHandler.
+ */
+public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(2, archives.size());
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+				"/vertices/" + originalTask.getJobVertexId() +
+				"/subtasks/" + originalAttempt.getParallelSubtaskIndex(),
+			archive1.getPath());
+		compareAttemptDetails(originalAttempt, archive1.getJson());
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + originalJob.getJobID() +
+				"/vertices/" + originalTask.getJobVertexId() +
+				"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+				"/attempts/" + originalAttempt.getAttemptNumber(),
+			archive2.getPath());
+		compareAttemptDetails(originalAttempt, archive2.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(),  null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
+			originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null);
+
+		compareAttemptDetails(originalAttempt, json);
+	}
+
+	private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText());
+		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+		Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText());
+		long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+		Assert.assertEquals(start, result.get("start-time").asLong());
+		long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
+		Assert.assertEquals(end, result.get("end-time").asLong());
+		Assert.assertEquals(end - start, result.get("duration").asLong());
+
+		ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..1478f00
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtasksAllAccumulatorsHandler.
+ */
+public class SubtasksAllAccumulatorsHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() +
+			"/subtasks/accumulators", archive.getPath());
+		compareSubtaskAccumulators(originalTask, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+		compareSubtaskAccumulators(originalTask, json);
+	}
+
+	private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+			JsonNode subtask = subtasks.get(x);
+			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+
+			Assert.assertEquals(x, subtask.get("subtask").asInt());
+			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+			Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText());
+
+			ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+				expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
+				(ArrayNode) subtask.get("user-accumulators"));
+		}
+	}
+}