You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:14 UTC
[04/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler
to flink-runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
new file mode 100644
index 0000000..8f21af0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the JobCancellationWithSavepointHandler.
+ */
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
+
+ private static final Executor executor = Executors.directExecutor();
+
+ @Test
+ public void testGetPaths() {
+ JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor);
+
+ JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler();
+ String[] triggerPaths = triggerHandler.getPaths();
+ Assert.assertEquals(2, triggerPaths.length);
+ List<String> triggerPathsList = Arrays.asList(triggerPaths);
+ Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint"));
+ Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"));
+
+ JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler();
+ String[] progressPaths = progressHandler.getPaths();
+ Assert.assertEquals(1, progressPaths.length);
+ Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]);
+ }
+
+ /**
+ * Tests that the cancellation ask timeout respects the checkpoint timeout.
+ * Otherwise, AskTimeoutExceptions are bound to happen for large state.
+ */
+ @Test
+ public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
+ long timeout = 128288238L;
+ JobID jobId = new JobID();
+ ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+ ExecutionGraph graph = mock(ExecutionGraph.class);
+ CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+ when(graph.getCheckpointCoordinator()).thenReturn(coord);
+ when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+ JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+ Map<String, String> params = new HashMap<>();
+ params.put("jobid", jobId.toString());
+ params.put("targetDirectory", "placeholder");
+
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+ handler.handleRequest(params, Collections.emptyMap(), jobManager);
+
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class));
+ }
+
+ /**
+ * Tests that the savepoint directory configuration is respected.
+ */
+ @Test
+ public void testSavepointDirectoryConfiguration() throws Exception {
+ long timeout = 128288238L;
+ JobID jobId = new JobID();
+ ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+ ExecutionGraph graph = mock(ExecutionGraph.class);
+ CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+ when(graph.getCheckpointCoordinator()).thenReturn(coord);
+ when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
+ JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
+
+ Map<String, String> params = new HashMap<>();
+ params.put("jobid", jobId.toString());
+
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
+
+ // 1. Use targetDirectory path param
+ params.put("targetDirectory", "custom-directory");
+ handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+ // 2. Use default
+ params.remove("targetDirectory");
+
+ handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class));
+
+ // 3. Throw Exception
+ handlers = new JobCancellationWithSavepointHandlers(holder, executor, null);
+ handler = handlers.getTriggerHandler();
+
+ try {
+ handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+ fail("Did not throw expected test Exception");
+ } catch (Exception e) {
+ IllegalStateException cause = (IllegalStateException) e.getCause();
+ assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
+ }
+ }
+
+ /**
+ * Tests triggering a new request and monitoring it.
+ */
+ @Test
+ public void testTriggerNewRequest() throws Exception {
+ JobID jobId = new JobID();
+ ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+ ExecutionGraph graph = mock(ExecutionGraph.class);
+ CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+ when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+ JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+ JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+ Map<String, String> params = new HashMap<>();
+ params.put("jobid", jobId.toString());
+ params.put("targetDirectory", "custom-directory");
+
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+ // Successful
+ CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>();
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
+
+ // Trigger
+ FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get();
+
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+ String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
+
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+ assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+ String json = response.content().toString(Charset.forName("UTF-8"));
+ JsonNode root = new ObjectMapper().readTree(json);
+
+ assertEquals("accepted", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals(location, root.get("location").asText());
+
+ // Trigger again
+ response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+ assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
+
+ json = response.content().toString(Charset.forName("UTF-8"));
+ root = new ObjectMapper().readTree(json);
+
+ assertEquals("accepted", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals(location, root.get("location").asText());
+
+ // Only single actual request
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+ // Query progress
+ params.put("requestId", "1");
+
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+ json = response.content().toString(Charset.forName("UTF-8"));
+ root = new ObjectMapper().readTree(json);
+
+ assertEquals("in-progress", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+
+ // Complete
+ successfulCancelWithSavepoint.complete("_path-savepoint_");
+
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+
+ assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+ json = response.content().toString(Charset.forName("UTF-8"));
+
+ root = new ObjectMapper().readTree(json);
+
+ assertEquals("success", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
+
+ // Query again, keep recent history
+
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+
+ assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+ json = response.content().toString(Charset.forName("UTF-8"));
+
+ root = new ObjectMapper().readTree(json);
+
+ assertEquals("success", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
+
+ // Query for unknown request
+ params.put("requestId", "9929");
+
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+ assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+ json = response.content().toString(Charset.forName("UTF-8"));
+
+ root = new ObjectMapper().readTree(json);
+
+ assertEquals("failed", root.get("status").asText());
+ assertEquals("9929", root.get("request-id").asText());
+ assertEquals("Unknown job/request ID", root.get("cause").asText());
+ }
+
+ /**
+ * Tests response when a request fails.
+ */
+ @Test
+ public void testFailedCancellation() throws Exception {
+ JobID jobId = new JobID();
+ ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+ ExecutionGraph graph = mock(ExecutionGraph.class);
+ CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
+ when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
+ JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
+ JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
+
+ Map<String, String> params = new HashMap<>();
+ params.put("jobid", jobId.toString());
+ params.put("targetDirectory", "custom-directory");
+
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+
+ // Successful
+ CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception"));
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
+
+ // Trigger
+ trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
+
+ // Query progress
+ params.put("requestId", "1");
+
+ FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
+ assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+ assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+ String json = response.content().toString(Charset.forName("UTF-8"));
+ JsonNode root = new ObjectMapper().readTree(json);
+
+ assertEquals("failed", root.get("status").asText());
+ assertEquals("1", root.get("request-id").asText());
+ assertEquals("Test Exception", root.get("cause").asText());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
new file mode 100644
index 0000000..567df8c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobConfigHandler.
+ */
+public class JobConfigHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath());
+ compareJobConfig(originalJob, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/config", paths[0]);
+ }
+
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ String answer = JobConfigHandler.createJobConfigJson(originalJob);
+ compareJobConfig(originalJob, answer);
+ }
+
+ private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException {
+ JsonNode job = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+ Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());
+ Assert.assertEquals(originalJob.getJobName(), job.get("name").asText());
+
+ ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig();
+ JsonNode config = job.get("execution-config");
+
+ Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText());
+ Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText());
+ Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt());
+ Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean());
+
+ Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters();
+ JsonNode userConfig = config.get("user-config");
+
+ for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) {
+ Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
new file mode 100644
index 0000000..afd743e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobDetailsHandler.
+ */
+public class JobDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(2, archives.size());
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath());
+ compareJobDetails(originalJob, archive1.getJson());
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath());
+ compareJobDetails(originalJob, archive2.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(2, paths.length);
+ List<String> pathsList = Lists.newArrayList(paths);
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
+
+ compareJobDetails(originalJob, json);
+ }
+
+ private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());
+ Assert.assertEquals(originalJob.getJobName(), result.get("name").asText());
+ Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean());
+ Assert.assertEquals(originalJob.getState().name(), result.get("state").asText());
+
+ Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong());
+ Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong());
+ Assert.assertEquals(
+ originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED),
+ result.get("duration").asLong()
+ );
+
+ JsonNode timestamps = result.get("timestamps");
+ for (JobStatus status : JobStatus.values()) {
+ Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong());
+ }
+
+ ArrayNode tasks = (ArrayNode) result.get("vertices");
+ int x = 0;
+ for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) {
+ JsonNode task = tasks.get(x);
+
+ Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText());
+ Assert.assertEquals(expectedTask.getName(), task.get("name").asText());
+ Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt());
+ Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText());
+
+ Assert.assertEquals(3, task.get("start-time").asLong());
+ Assert.assertEquals(5, task.get("end-time").asLong());
+ Assert.assertEquals(2, task.get("duration").asLong());
+
+ JsonNode subtasksPerState = task.get("tasks");
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
+ Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
+ Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
+
+ long expectedNumBytesIn = 0;
+ long expectedNumBytesOut = 0;
+ long expectedNumRecordsIn = 0;
+ long expectedNumRecordsOut = 0;
+
+ for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) {
+ IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+ expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+ expectedNumBytesOut += ioMetrics.getNumBytesOut();
+ expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+ expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+ }
+
+ JsonNode metrics = task.get("metrics");
+
+ Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+ Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+ Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+ Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+
+ x++;
+ }
+ Assert.assertEquals(1, tasks.size());
+
+ JsonNode statusCounts = result.get("status-counts");
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+ Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+ Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()), result.get("plan"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
new file mode 100644
index 0000000..6a20696
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobExceptionsHandler.
+ */
+public class JobExceptionsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath());
+ compareExceptions(originalJob, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
+
+ compareExceptions(originalJob, json);
+ }
+
+ private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
+ Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
+
+ ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
+
+ int x = 0;
+ for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) {
+ if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+ JsonNode exception = exceptions.get(x);
+
+ Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
+ Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong());
+ Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
+
+ TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+ String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort();
+ Assert.assertEquals(expectedLocationString, exception.get("location").asText());
+ }
+ x++;
+ }
+ Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
new file mode 100644
index 0000000..03ddb73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the JobManagerConfigHandler.
+ */
+public class JobManagerConfigHandlerTest {
+ @Test
+ public void testGetPaths() {
+ JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobmanager/config", paths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
new file mode 100644
index 0000000..6d3b213
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobPlanHandler.
+ */
+public class JobPlanHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath());
+ Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
new file mode 100644
index 0000000..2c39fcf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandlerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the JobStoppingHandler.
+ */
+public class JobStoppingHandlerTest extends TestLogger {
+ @Test
+ public void testGetPaths() {
+ JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(2, paths.length);
+ List<String> pathsList = Lists.newArrayList(paths);
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop"));
+ Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..feffe60
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexAccumulatorsHandler.
+ */
+public class JobVertexAccumulatorsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath());
+ compareAccumulators(originalTask, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+
+ compareAccumulators(originalTask, json);
+ }
+
+ private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+
+ ArrayNode accs = (ArrayNode) result.get("user-accumulators");
+ StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified();
+
+ ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
new file mode 100644
index 0000000..bd6817f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for back pressure handler responses.
+ */
+public class JobVertexBackPressureHandlerTest {
+ @Test
+ public void testGetPaths() {
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
+ }
+
+ /** Tests the response when no stats are available. */
+ @Test
+ public void testResponseNoStatsAvailable() throws Exception {
+ ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+ BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+ when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+ .thenReturn(Optional.empty());
+
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+ mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
+ statsTracker,
+ 9999);
+
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(response);
+
+ // Single element
+ assertEquals(1, rootNode.size());
+
+ // Status
+ JsonNode status = rootNode.get("status");
+ assertNotNull(status);
+ assertEquals("deprecated", status.textValue());
+
+ verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+ }
+
+ /** Tests the response when stats are available. */
+ @Test
+ public void testResponseStatsAvailable() throws Exception {
+ ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+ BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+ OperatorBackPressureStats stats = new OperatorBackPressureStats(
+ 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+ when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+ .thenReturn(Optional.of(stats));
+
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+ mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
+ statsTracker,
+ 9999);
+
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(response);
+
+ // Single element
+ assertEquals(4, rootNode.size());
+
+ // Status
+ JsonNode status = rootNode.get("status");
+ assertNotNull(status);
+ assertEquals("ok", status.textValue());
+
+ // Back pressure level
+ JsonNode backPressureLevel = rootNode.get("backpressure-level");
+ assertNotNull(backPressureLevel);
+ assertEquals("high", backPressureLevel.textValue());
+
+ // End time stamp
+ JsonNode endTimeStamp = rootNode.get("end-timestamp");
+ assertNotNull(endTimeStamp);
+ assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+ // Subtasks
+ JsonNode subTasks = rootNode.get("subtasks");
+ assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+ for (int i = 0; i < subTasks.size(); i++) {
+ JsonNode subTask = subTasks.get(i);
+
+ JsonNode index = subTask.get("subtask");
+ assertEquals(i, index.intValue());
+
+ JsonNode level = subTask.get("backpressure-level");
+ assertEquals(JobVertexBackPressureHandler
+ .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+ JsonNode ratio = subTask.get("ratio");
+ assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+ }
+
+ // Verify not triggered
+ verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
+ }
+
+ /** Tests that after the refresh interval another sample is triggered. */
+ @Test
+ public void testResponsePassedRefreshInterval() throws Exception {
+ ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+ BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+ OperatorBackPressureStats stats = new OperatorBackPressureStats(
+ 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+ when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+ .thenReturn(Optional.of(stats));
+
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+ mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
+ statsTracker,
+ 0); // <----- refresh interval should fire immediately
+
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(response);
+
+ // Single element
+ assertEquals(4, rootNode.size());
+
+ // Status
+ JsonNode status = rootNode.get("status");
+ assertNotNull(status);
+ // Interval passed, hence deprecated
+ assertEquals("deprecated", status.textValue());
+
+ // Back pressure level
+ JsonNode backPressureLevel = rootNode.get("backpressure-level");
+ assertNotNull(backPressureLevel);
+ assertEquals("high", backPressureLevel.textValue());
+
+ // End time stamp
+ JsonNode endTimeStamp = rootNode.get("end-timestamp");
+ assertNotNull(endTimeStamp);
+ assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+ // Subtasks
+ JsonNode subTasks = rootNode.get("subtasks");
+ assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+ for (int i = 0; i < subTasks.size(); i++) {
+ JsonNode subTask = subTasks.get(i);
+
+ JsonNode index = subTask.get("subtask");
+ assertEquals(i, index.intValue());
+
+ JsonNode level = subTask.get("backpressure-level");
+ assertEquals(JobVertexBackPressureHandler
+ .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+ JsonNode ratio = subTask.get("ratio");
+ assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+ }
+
+ // Verify triggered
+ verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
new file mode 100644
index 0000000..5af1d53
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexDetailsHandler.
+ */
+public class JobVertexDetailsHandlerTest {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+ compareVertexDetails(originalTask, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ String json = JobVertexDetailsHandler.createVertexDetailsJson(
+ originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+ compareVertexDetails(originalTask, json);
+ }
+
+ private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+ Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+ Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+ Assert.assertTrue(result.get("now").asLong() > 0);
+
+ ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+ Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+ for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+ AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+ JsonNode subtask = subtasks.get(x);
+
+ Assert.assertEquals(x, subtask.get("subtask").asInt());
+ Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText());
+ Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+
+ TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+ String expectedLocationString = location.getHostname() + ":" + location.dataPort();
+ Assert.assertEquals(expectedLocationString, subtask.get("host").asText());
+ long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
+ Assert.assertEquals(start, subtask.get("start-time").asLong());
+ long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
+ Assert.assertEquals(end, subtask.get("end-time").asLong());
+ Assert.assertEquals(end - start, subtask.get("duration").asLong());
+
+ ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
new file mode 100644
index 0000000..2a027fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the JobVertexTaskManagersHandler.
+ */
+public class JobVertexTaskManagersHandlerTest extends TestLogger {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath());
+ compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+ String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
+ originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+ compareVertexTaskManagers(originalTask, originalSubtask, json);
+ }
+
+ private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+ Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+ Assert.assertTrue(result.get("now").asLong() > 0);
+
+ ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
+
+ JsonNode taskManager = taskmanagers.get(0);
+
+ TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation();
+ String expectedLocationString = location.getHostname() + ':' + location.dataPort();
+ Assert.assertEquals(expectedLocationString, taskManager.get("host").asText());
+ Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText());
+
+ Assert.assertEquals(3, taskManager.get("start-time").asLong());
+ Assert.assertEquals(5, taskManager.get("end-time").asLong());
+ Assert.assertEquals(2, taskManager.get("duration").asLong());
+
+ JsonNode statusCounts = taskManager.get("status-counts");
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+ Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+ Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+ long expectedNumBytesIn = 0;
+ long expectedNumBytesOut = 0;
+ long expectedNumRecordsIn = 0;
+ long expectedNumRecordsOut = 0;
+
+ for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) {
+ IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+ expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+ expectedNumBytesOut += ioMetrics.getNumBytesOut();
+ expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+ expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+ }
+
+ JsonNode metrics = taskManager.get("metrics");
+
+ Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+ Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+ Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+ Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..9e0d549
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskCurrentAttemptDetailsHandler.
+ */
+public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
+ @Test
+ public void testGetPaths() {
+ SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..49e54c0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+ "/attempts/" + originalAttempt.getAttemptNumber() +
+ "/accumulators",
+ archive.getPath());
+ compareAttemptAccumulators(originalAttempt, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+ String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+
+ compareAttemptAccumulators(originalAttempt, json);
+ }
+
+ private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+ Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+ Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText());
+
+ ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..e1fe8b5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtaskExecutionAttemptDetailsHandler.
+ */
+public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(2, archives.size());
+
+ Iterator<ArchivedJson> iterator = archives.iterator();
+ ArchivedJson archive1 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex(),
+ archive1.getPath());
+ compareAttemptDetails(originalAttempt, archive1.getJson());
+
+ ArchivedJson archive2 = iterator.next();
+ Assert.assertEquals(
+ "/jobs/" + originalJob.getJobID() +
+ "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
+ "/attempts/" + originalAttempt.getAttemptNumber(),
+ archive2.getPath());
+ compareAttemptDetails(originalAttempt, archive2.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+ String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
+ originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null);
+
+ compareAttemptDetails(originalAttempt, json);
+ }
+
+ private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+ Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText());
+ Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+ Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText());
+ long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+ Assert.assertEquals(start, result.get("start-time").asLong());
+ long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
+ Assert.assertEquals(end, result.get("end-time").asLong());
+ Assert.assertEquals(end - start, result.get("duration").asLong());
+
+ ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
new file mode 100644
index 0000000..1478f00
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtasksAllAccumulatorsHandler.
+ */
+public class SubtasksAllAccumulatorsHandlerTest extends TestLogger {
+
+ @Test
+ public void testArchiver() throws Exception {
+ JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+ AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+
+ Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+ Assert.assertEquals(1, archives.size());
+
+ ArchivedJson archive = archives.iterator().next();
+ Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() +
+ "/subtasks/accumulators", archive.getPath());
+ compareSubtaskAccumulators(originalTask, archive.getJson());
+ }
+
+ @Test
+ public void testGetPaths() {
+ SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String[] paths = handler.getPaths();
+ Assert.assertEquals(1, paths.length);
+ Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
+ }
+
+ @Test
+ public void testJsonGeneration() throws Exception {
+ AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+ String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+ compareSubtaskAccumulators(originalTask, json);
+ }
+
+ private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
+ JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+ Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+ Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+
+ ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+ Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+ for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+ JsonNode subtask = subtasks.get(x);
+ AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+
+ Assert.assertEquals(x, subtask.get("subtask").asInt());
+ Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+ Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText());
+
+ ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+ expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
+ (ArrayNode) subtask.get("user-accumulators"));
+ }
+ }
+}