You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:19 UTC
[09/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler
to flink-runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
deleted file mode 100644
index 9c5e168..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointStatsSubtaskDetailsHandler.
- */
-public class CheckpointStatsSubtaskDetailsHandlerTest {
-
- @Test
- public void testArchiver() throws Exception {
- JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
- ObjectMapper mapper = new ObjectMapper();
-
- PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
- when(checkpoint.getCheckpointId()).thenReturn(1992139L);
- when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
- when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
- TaskStateStats task = createTaskStateStats(1237);
- when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
-
- CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
- when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
- CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
- when(snapshot.getHistory()).thenReturn(history);
-
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- when(graph.getJobID()).thenReturn(new JobID());
-
- Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
- Assert.assertEquals(1, archives.size());
-
- ArchivedJson archive = archives.iterator().next();
- Assert.assertEquals(
- "/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
- archive.getPath());
- JsonNode rootNode = mapper.readTree(archive.getJson());
- assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
- assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-
- verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
- }
-
- @Test
- public void testGetPaths() {
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- String[] paths = handler.getPaths();
- Assert.assertEquals(1, paths.length);
- Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
- }
-
- /**
- * Tests a subtask details request.
- */
- @Test
- public void testSubtaskRequest() throws Exception {
- PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
- when(checkpoint.getCheckpointId()).thenReturn(1992139L);
- when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
- when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
- TaskStateStats task = createTaskStateStats(1237);
- when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
- JsonNode rootNode = triggerRequest(checkpoint);
- assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
- assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
-
- verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
- }
-
- /**
- * Tests a subtask details request.
- */
- @Test
- public void testSubtaskRequestNoSummary() throws Exception {
- PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
- when(checkpoint.getCheckpointId()).thenReturn(1992139L);
- when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
- when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
-
- TaskStateStats task = createTaskStateStats(0); // no acknowledged
- when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task);
-
- JsonNode rootNode = triggerRequest(checkpoint);
- assertNull(rootNode.get("summary"));
- }
-
- /**
- * Tests request with illegal checkpoint ID param.
- */
- @Test
- public void testIllegalCheckpointId() throws Exception {
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "illegal checkpoint");
- String json = handler.handleRequest(graph, params).get();
-
- assertEquals("{}", json);
- }
-
- /**
- * Tests request with missing checkpoint ID param.
- */
- @Test
- public void testNoCheckpointIdParam() throws Exception {
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
-
- assertEquals("{}", json);
- }
-
- /**
- * Test lookup of not existing checkpoint in history.
- */
- @Test
- public void testCheckpointNotFound() throws Exception {
- CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
- when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
-
- CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
- when(snapshot.getHistory()).thenReturn(history);
-
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "123");
- params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params).get();
-
- assertEquals("{}", json);
- verify(history, times(1)).getCheckpointById(anyLong());
- }
-
- /**
- * Tests request with illegal job vertex ID param.
- */
- @Test
- public void testIllegalJobVertexIdParam() throws Exception {
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "1");
- params.put("vertexid", "illegal vertex id");
- String json = handler.handleRequest(graph, params).get();
-
- assertEquals("{}", json);
- }
-
- /**
- * Tests request with missing job vertex ID param.
- */
- @Test
- public void testNoJobVertexIdParam() throws Exception {
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "1");
- String json = handler.handleRequest(graph, params).get();
-
- assertEquals("{}", json);
- }
-
- /**
- * Test lookup of not existing job vertex ID in checkpoint.
- */
- @Test
- public void testJobVertexNotFound() throws Exception {
- PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
- when(inProgress.getTaskStateStats(any(JobVertexID.class))).thenReturn(null); // not found
- CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
- when(history.getCheckpointById(anyLong())).thenReturn(inProgress);
-
- CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
- when(snapshot.getHistory()).thenReturn(history);
-
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "123");
- params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params).get();
-
- assertEquals("{}", json);
- verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
- }
-
- // ------------------------------------------------------------------------
-
- private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
- CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
- when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
- CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
- when(snapshot.getHistory()).thenReturn(history);
-
- AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
- Map<String, String> params = new HashMap<>();
- params.put("checkpointid", "123");
- params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params).get();
-
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readTree(json);
- }
-
- private static TaskStateStats createTaskStateStats(int numAcknowledged) {
- ThreadLocalRandom rand = ThreadLocalRandom.current();
-
- TaskStateStats task = mock(TaskStateStats.class);
- when(task.getJobVertexId()).thenReturn(new JobVertexID());
- when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
- when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
- when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
- when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
- when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
- when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(numAcknowledged);
-
- TaskStateStats.TaskStateStatsSummary summary = mock(TaskStateStats.TaskStateStatsSummary.class);
-
- doReturn(createMinMaxAvgStats(rand)).when(summary).getStateSizeStats();
- doReturn(createMinMaxAvgStats(rand)).when(summary).getAckTimestampStats();
- doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentBufferedStats();
- doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentDurationStats();
- doReturn(createMinMaxAvgStats(rand)).when(summary).getSyncCheckpointDurationStats();
- doReturn(createMinMaxAvgStats(rand)).when(summary).getAsyncCheckpointDurationStats();
-
- when(task.getSummaryStats()).thenReturn(summary);
-
- SubtaskStateStats[] subtasks = new SubtaskStateStats[3];
- subtasks[0] = createSubtaskStats(0, rand);
- subtasks[1] = createSubtaskStats(1, rand);
- subtasks[2] = null;
-
- when(task.getSubtaskStats()).thenReturn(subtasks);
-
- return task;
- }
-
- private static void verifyTaskNode(JsonNode taskNode, TaskStateStats task, long triggerTimestamp) {
- long duration = ThreadLocalRandom.current().nextInt(128);
-
- assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
- assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
- assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
- assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
- assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
- assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
-
- TaskStateStats.TaskStateStatsSummary summary = task.getSummaryStats();
- verifyMinMaxAvgStats(summary.getStateSizeStats(), taskNode.get("summary").get("state_size"));
- verifyMinMaxAvgStats(summary.getSyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("sync"));
- verifyMinMaxAvgStats(summary.getAsyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("async"));
- verifyMinMaxAvgStats(summary.getAlignmentBufferedStats(), taskNode.get("summary").get("alignment").get("buffered"));
- verifyMinMaxAvgStats(summary.getAlignmentDurationStats(), taskNode.get("summary").get("alignment").get("duration"));
-
- JsonNode endToEndDurationNode = taskNode.get("summary").get("end_to_end_duration");
- assertEquals(summary.getAckTimestampStats().getMinimum() - triggerTimestamp, endToEndDurationNode.get("min").asLong());
- assertEquals(summary.getAckTimestampStats().getMaximum() - triggerTimestamp, endToEndDurationNode.get("max").asLong());
- assertEquals((long) summary.getAckTimestampStats().getAverage() - triggerTimestamp, endToEndDurationNode.get("avg").asLong());
-
- SubtaskStateStats[] subtasks = task.getSubtaskStats();
- Iterator<JsonNode> it = taskNode.get("subtasks").iterator();
-
- assertTrue(it.hasNext());
- verifySubtaskStats(it.next(), 0, subtasks[0]);
-
- assertTrue(it.hasNext());
- verifySubtaskStats(it.next(), 1, subtasks[1]);
-
- assertTrue(it.hasNext());
- verifySubtaskStats(it.next(), 2, subtasks[2]);
-
- assertFalse(it.hasNext());
- }
-
- private static SubtaskStateStats createSubtaskStats(int index, ThreadLocalRandom rand) {
- SubtaskStateStats subtask = mock(SubtaskStateStats.class);
- when(subtask.getSubtaskIndex()).thenReturn(index);
- when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
- when(subtask.getAlignmentBuffered()).thenReturn(rand.nextLong(1024));
- when(subtask.getAlignmentDuration()).thenReturn(rand.nextLong(1024));
- when(subtask.getSyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
- when(subtask.getAsyncCheckpointDuration()).thenReturn(rand.nextLong(1024));
- when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024));
- when(subtask.getStateSize()).thenReturn(rand.nextLong(1024));
- when(subtask.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024));
- return subtask;
- }
-
- private static void verifySubtaskStats(JsonNode subtaskNode, int index, SubtaskStateStats subtask) {
- if (subtask == null) {
- assertEquals(index, subtaskNode.get("index").asInt());
- assertEquals("pending_or_failed", subtaskNode.get("status").asText());
- } else {
- assertEquals(subtask.getSubtaskIndex(), subtaskNode.get("index").asInt());
- assertEquals("completed", subtaskNode.get("status").asText());
- assertEquals(subtask.getAckTimestamp(), subtaskNode.get("ack_timestamp").asLong());
- assertEquals(subtask.getEndToEndDuration(0), subtaskNode.get("end_to_end_duration").asLong());
- assertEquals(subtask.getStateSize(), subtaskNode.get("state_size").asLong());
- assertEquals(subtask.getSyncCheckpointDuration(), subtaskNode.get("checkpoint").get("sync").asLong());
- assertEquals(subtask.getAsyncCheckpointDuration(), subtaskNode.get("checkpoint").get("async").asLong());
- assertEquals(subtask.getAlignmentBuffered(), subtaskNode.get("alignment").get("buffered").asLong());
- assertEquals(subtask.getAlignmentDuration(), subtaskNode.get("alignment").get("duration").asLong());
- }
- }
-
- private static MinMaxAvgStats createMinMaxAvgStats(ThreadLocalRandom rand) {
- MinMaxAvgStats mma = mock(MinMaxAvgStats.class);
- when(mma.getMinimum()).thenReturn(rand.nextLong(1024));
- when(mma.getMaximum()).thenReturn(rand.nextLong(1024));
- when(mma.getAverage()).thenReturn(rand.nextLong(1024));
-
- return mma;
- }
-
- private static void verifyMinMaxAvgStats(MinMaxAvgStats expected, JsonNode node) {
- assertEquals(expected.getMinimum(), node.get("min").asLong());
- assertEquals(expected.getMaximum(), node.get("max").asLong());
- assertEquals(expected.getAverage(), node.get("avg").asLong());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
index 2e52f2e..03666a8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.history;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
import org.junit.Assert;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 33d9c79..3c93be3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
deleted file mode 100644
index 0755888..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the AbstractMetricsHandler.
- */
-public class AbstractMetricsHandlerTest extends TestLogger {
- /**
- * Verifies that the handlers correctly handle expected REST calls.
- */
- @Test
- public void testHandleRequest() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- Map<String, String> queryParams = new HashMap<>();
-
- pathParams.put("jobid", "jobid");
- pathParams.put("vertexid", "taskid");
-
- // get list of available metrics
- String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
- assertEquals("[" +
- "{\"id\":\"8.opname.abc.metric5\"}," +
- "{\"id\":\"8.abc.metric4\"}" +
- "]",
- availableList);
-
- // get value for a single metric
- queryParams.put("get", "8.opname.abc.metric5");
-
- String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
- assertEquals("[" +
- "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
- "]"
- , metricValue
- );
-
- // get values for multiple metrics
- queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
-
- String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
-
- assertEquals("[" +
- "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
- "{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
- "]",
- metricValues
- );
- }
-
- /**
- * Verifies that a malformed request for available metrics does not throw an exception.
- */
- @Test
- public void testInvalidListDoesNotFail() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- Map<String, String> queryParams = new HashMap<>();
-
- pathParams.put("jobid", "jobid");
- pathParams.put("vertexid", "taskid");
-
- //-----invalid variable
- pathParams.put("jobid", "nonexistent");
-
- try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
- } catch (Exception e) {
- fail();
- }
- }
-
- /**
- * Verifies that a malformed request for a metric value does not throw an exception.
- */
- @Test
- public void testInvalidGetDoesNotFail() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- Map<String, String> queryParams = new HashMap<>();
-
- pathParams.put("jobid", "jobid");
- pathParams.put("vertexid", "taskid");
-
- //-----empty string
- queryParams.put("get", "");
-
- try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
- } catch (Exception e) {
- fail(e.getMessage());
- }
-
- //-----invalid variable
- pathParams.put("jobid", "nonexistent");
- queryParams.put("get", "subindex.opname.abc.metric5");
-
- try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
- } catch (Exception e) {
- fail(e.getMessage());
- }
-
- //-----invalid metric
- pathParams.put("jobid", "nonexistant");
- queryParams.put("get", "subindex.opname.abc.nonexistant");
-
- try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
deleted file mode 100644
index 6d17b40..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobManagerMetricsHandler.
- */
-public class JobManagerMetricsHandlerTest extends TestLogger {
- @Test
- public void testGetPaths() {
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
- String[] paths = handler.getPaths();
- Assert.assertEquals(1, paths.length);
- Assert.assertEquals("/jobmanager/metrics", paths[0]);
- }
-
- @Test
- public void getMapFor() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertEquals("0", metrics.get("abc.metric1"));
- }
-
- @Test
- public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = fetcher.getMetricStore();
-
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertNotNull(metrics);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
deleted file mode 100644
index b26ceab..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobMetricsHandler.
- */
-public class JobMetricsHandlerTest extends TestLogger {
- @Test
- public void testGetPaths() {
- JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
- String[] paths = handler.getPaths();
- Assert.assertEquals(1, paths.length);
- Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
- }
-
- @Test
- public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- pathParams.put(PARAMETER_JOB_ID, "jobid");
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertEquals("2", metrics.get("abc.metric3"));
- }
-
- @Test
- public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = fetcher.getMetricStore();
-
- JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertNull(metrics);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
deleted file mode 100644
index d637a4a..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
-import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the JobVertexMetricsHandler.
- */
-public class JobVertexMetricsHandlerTest extends TestLogger {
- @Test
- public void testGetPaths() {
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
- String[] paths = handler.getPaths();
- Assert.assertEquals(1, paths.length);
- Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
- }
-
- @Test
- public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- pathParams.put(PARAMETER_JOB_ID, "jobid");
- pathParams.put(PARAMETER_VERTEX_ID, "taskid");
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertEquals("3", metrics.get("8.abc.metric4"));
-
- assertEquals("4", metrics.get("8.opname.abc.metric5"));
- }
-
- @Test
- public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = fetcher.getMetricStore();
-
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertNull(metrics);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
deleted file mode 100644
index 09a0829..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for the MetricFetcher.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
-public class MetricFetcherTest extends TestLogger {
- @Test
- public void testUpdate() throws Exception {
- final Time timeout = Time.seconds(10L);
-
- // ========= setup TaskManager =================================================================================
- JobID jobID = new JobID();
- InstanceID tmID = new InstanceID();
- ResourceID tmRID = new ResourceID(tmID.toString());
- TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
- when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
- Instance taskManager = mock(Instance.class);
- when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
- when(taskManager.getId()).thenReturn(tmID);
- when(taskManager.getTaskManagerID()).thenReturn(tmRID);
-
- // ========= setup JobManager ==================================================================================
- JobDetails details = mock(JobDetails.class);
- when(details.getJobId()).thenReturn(jobID);
-
- JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
-
- when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
- when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
- when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
-
- GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
- when(retriever.getNow())
- .thenReturn(Optional.of(jobManagerGateway));
-
- // ========= setup QueryServices ================================================================================
- MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
- MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
-
- MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
-
- when(jmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
- when(tmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
- MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
- when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
- when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
-
- // ========= start MetricFetcher testing =======================================================================
- MetricFetcher fetcher = new MetricFetcher(
- retriever,
- queryServiceRetriever,
- Executors.directExecutor(),
- timeout);
-
- // verify that update fetches metrics and updates the store
- fetcher.update();
- MetricStore store = fetcher.getMetricStore();
- synchronized (store) {
- assertEquals("7", store.jobManager.metrics.get("abc.hist_min"));
- assertEquals("6", store.jobManager.metrics.get("abc.hist_max"));
- assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean"));
- assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median"));
- assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev"));
- assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75"));
- assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90"));
- assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95"));
- assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98"));
- assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
- assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
-
- assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
- assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
- assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
- assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc"));
- }
- }
-
- private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
- Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
- Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
- Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
- Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
-
- SimpleCounter c1 = new SimpleCounter();
- SimpleCounter c2 = new SimpleCounter();
-
- c1.inc(1);
- c2.inc(2);
-
- counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc"));
- counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
- meters.put(new Meter() {
- @Override
- public void markEvent() {
- }
-
- @Override
- public void markEvent(long n) {
- }
-
- @Override
- public double getRate() {
- return 5;
- }
-
- @Override
- public long getCount() {
- return 10;
- }
- }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
- gauges.put(new Gauge<String>() {
- @Override
- public String getValue() {
- return "x";
- }
- }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
- histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
-
- MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
- MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
- serializer.close();
-
- return dump;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
deleted file mode 100644
index d19e8c6..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the MetricStore.
- */
-public class MetricStoreTest extends TestLogger {
- @Test
- public void testAdd() throws IOException {
- MetricStore store = setupStore(new MetricStore());
-
- assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
- assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
- assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
- assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1"));
- assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1"));
- }
-
- @Test
- public void testMalformedNameHandling() {
- MetricStore store = new MetricStore();
- //-----verify that no exceptions are thrown
-
- // null
- store.add(null);
- // empty name
- QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("");
- MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0);
- store.add(cd);
-
- //-----verify that no side effects occur
- assertEquals(0, store.jobManager.metrics.size());
- assertEquals(0, store.taskManagers.size());
- assertEquals(0, store.jobs.size());
- }
-
- public static MetricStore setupStore(MetricStore store) {
- QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
- MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
-
- QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
- MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1);
-
- QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
- MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2);
-
- QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
- MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3);
-
- QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
- MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4);
-
- store.add(cd1);
- store.add(cd2);
- store.add(cd3);
- store.add(cd4);
- store.add(cd5);
-
- return store;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
deleted file mode 100644
index 9c5549e..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-/**
- * Tests for the TaskManagerMetricsHandler.
- */
-public class TaskManagerMetricsHandlerTest extends TestLogger {
- @Test
- public void testGetPaths() {
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
- String[] paths = handler.getPaths();
- Assert.assertEquals(1, paths.length);
- Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
- }
-
- @Test
- public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
-
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
- pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertEquals("1", metrics.get("abc.metric2"));
- }
-
- @Test
- public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(
- mock(GatewayRetriever.class),
- mock(MetricQueryServiceRetriever.class),
- Executors.directExecutor(),
- TestingUtils.TIMEOUT());
- MetricStore store = fetcher.getMetricStore();
-
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
-
- Map<String, String> pathParams = new HashMap<>();
-
- Map<String, String> metrics = handler.getMapFor(pathParams, store);
-
- assertNull(metrics);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
deleted file mode 100644
index 979d943..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Utility class for constructing an ArchivedExecution.
- */
-public class ArchivedExecutionBuilder {
- private ExecutionAttemptID attemptId;
- private long[] stateTimestamps;
- private int attemptNumber;
- private ExecutionState state;
- private String failureCause;
- private TaskManagerLocation assignedResourceLocation;
- private StringifiedAccumulatorResult[] userAccumulators;
- private IOMetrics ioMetrics;
- private int parallelSubtaskIndex;
-
- public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) {
- this.attemptId = attemptId;
- return this;
- }
-
- public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) {
- Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length);
- this.stateTimestamps = stateTimestamps;
- return this;
- }
-
- public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
- this.attemptNumber = attemptNumber;
- return this;
- }
-
- public ArchivedExecutionBuilder setState(ExecutionState state) {
- this.state = state;
- return this;
- }
-
- public ArchivedExecutionBuilder setFailureCause(String failureCause) {
- this.failureCause = failureCause;
- return this;
- }
-
- public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) {
- this.assignedResourceLocation = assignedResourceLocation;
- return this;
- }
-
- public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
- this.userAccumulators = userAccumulators;
- return this;
- }
-
- public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) {
- this.parallelSubtaskIndex = parallelSubtaskIndex;
- return this;
- }
-
- public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
- this.ioMetrics = ioMetrics;
- return this;
- }
-
- public ArchivedExecution build() throws UnknownHostException {
- return new ArchivedExecution(
- userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0],
- ioMetrics != null ? ioMetrics : new TestIOMetrics(),
- attemptId != null ? attemptId : new ExecutionAttemptID(),
- attemptNumber,
- state != null ? state : ExecutionState.FINISHED,
- failureCause != null ? failureCause : "(null)",
- assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234),
- parallelSubtaskIndex,
- stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5}
- );
- }
-
- private static class TestIOMetrics extends IOMetrics {
- private static final long serialVersionUID = -5920076211680012555L;
-
- public TestIOMetrics() {
- super(
- new MeterView(new TestCounter(1), 0),
- new MeterView(new TestCounter(2), 0),
- new MeterView(new TestCounter(3), 0),
- new MeterView(new TestCounter(4), 0),
- new MeterView(new TestCounter(5), 0));
- }
- }
-
- private static class TestCounter implements Counter {
- private final long count;
-
- private TestCounter(long count) {
- this.count = count;
- }
-
- @Override
- public void inc() {
- }
-
- @Override
- public void inc(long n) {
- }
-
- @Override
- public void dec() {
- }
-
- @Override
- public void dec(long n) {
- }
-
- @Override
- public long getCount() {
- return count;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
deleted file mode 100644
index 053f718..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.ExecutionMode;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Utility class for constructing an ArchivedExecutionConfig.
- */
-public class ArchivedExecutionConfigBuilder {
- private String executionMode;
- private String restartStrategyDescription;
- private int parallelism;
- private boolean objectReuseEnabled;
- private Map<String, String> globalJobParameters;
-
- public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) {
- this.executionMode = executionMode;
- return this;
- }
-
- public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) {
- this.restartStrategyDescription = restartStrategyDescription;
- return this;
- }
-
- public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
- this.parallelism = parallelism;
- return this;
- }
-
- public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) {
- this.objectReuseEnabled = objectReuseEnabled;
- return this;
- }
-
- public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) {
- this.globalJobParameters = globalJobParameters;
- return this;
- }
-
- public ArchivedExecutionConfig build() {
- return new ArchivedExecutionConfig(
- executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(),
- restartStrategyDescription != null ? restartStrategyDescription : "default",
- parallelism,
- objectReuseEnabled,
- globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap()
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
deleted file mode 100644
index 57b300a..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionGraph.
- */
-public class ArchivedExecutionGraphBuilder {
-
- private static final Random RANDOM = new Random();
-
- private JobID jobID;
- private String jobName;
- private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
- private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
- private long[] stateTimestamps;
- private JobStatus state;
- private ErrorInfo failureCause;
- private String jsonPlan;
- private StringifiedAccumulatorResult[] archivedUserAccumulators;
- private ArchivedExecutionConfig archivedExecutionConfig;
- private boolean isStoppable;
- private Map<String, SerializedValue<Object>> serializedUserAccumulators;
-
- public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
- this.jobID = jobID;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setJobName(String jobName) {
- this.jobName = jobName;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex> tasks) {
- this.tasks = tasks;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> verticesInCreationOrder) {
- this.verticesInCreationOrder = verticesInCreationOrder;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) {
- Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length);
- this.stateTimestamps = stateTimestamps;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setState(JobStatus state) {
- this.state = state;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) {
- this.failureCause = failureCause;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
- this.jsonPlan = jsonPlan;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) {
- this.archivedUserAccumulators = archivedUserAccumulators;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) {
- this.archivedExecutionConfig = archivedExecutionConfig;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
- isStoppable = stoppable;
- return this;
- }
-
- public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>> serializedUserAccumulators) {
- this.serializedUserAccumulators = serializedUserAccumulators;
- return this;
- }
-
- public ArchivedExecutionGraph build() {
- Preconditions.checkNotNull(tasks, "Tasks must not be null.");
- JobID jobID = this.jobID != null ? this.jobID : new JobID();
- String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt();
- return new ArchivedExecutionGraph(
- jobID,
- jobName,
- tasks,
- verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
- stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
- state != null ? state : JobStatus.FINISHED,
- failureCause,
- jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
- archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
- serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(),
- archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
- isStoppable,
- null,
- null
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
deleted file mode 100644
index 3ef4106..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionJobVertex.
- */
-public class ArchivedExecutionJobVertexBuilder {
-
- private static final Random RANDOM = new Random();
-
- private ArchivedExecutionVertex[] taskVertices;
- private JobVertexID id;
- private String name;
- private int parallelism;
- private int maxParallelism;
- private StringifiedAccumulatorResult[] archivedUserAccumulators;
-
- public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices) {
- this.taskVertices = taskVertices;
- return this;
- }
-
- public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
- this.id = id;
- return this;
- }
-
- public ArchivedExecutionJobVertexBuilder setName(String name) {
- this.name = name;
- return this;
- }
-
- public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) {
- this.parallelism = parallelism;
- return this;
- }
-
- public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) {
- this.maxParallelism = maxParallelism;
- return this;
- }
-
- public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) {
- this.archivedUserAccumulators = archivedUserAccumulators;
- return this;
- }
-
- public ArchivedExecutionJobVertex build() {
- Preconditions.checkNotNull(taskVertices);
- return new ArchivedExecutionJobVertex(
- taskVertices,
- id != null ? id : new JobVertexID(),
- name != null ? name : "task_" + RANDOM.nextInt(),
- parallelism,
- maxParallelism,
- archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
deleted file mode 100644
index 67e9e11..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.util.EvictingBoundedList;
-import org.apache.flink.util.Preconditions;
-
-import java.util.List;
-import java.util.Random;
-
-/**
- * Utility class for constructing an ArchivedExecutionVertex.
- */
-public class ArchivedExecutionVertexBuilder {
-
- private static final Random RANDOM = new Random();
-
- private int subtaskIndex;
- private EvictingBoundedList<ArchivedExecution> priorExecutions;
- private String taskNameWithSubtask;
- private ArchivedExecution currentExecution;
-
- public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) {
- this.subtaskIndex = subtaskIndex;
- return this;
- }
-
- public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions) {
- this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size());
- for (ArchivedExecution execution : priorExecutions) {
- this.priorExecutions.add(execution);
- }
- return this;
- }
-
- public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask) {
- this.taskNameWithSubtask = taskNameWithSubtask;
- return this;
- }
-
- public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution) {
- this.currentExecution = currentExecution;
- return this;
- }
-
- public ArchivedExecutionVertex build() {
- Preconditions.checkNotNull(currentExecution);
- return new ArchivedExecutionVertex(
- subtaskIndex,
- taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" + subtaskIndex,
- currentExecution,
- priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0)
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
deleted file mode 100644
index 3e4fc01..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.utils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecution;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Common entry-point for accessing generated ArchivedExecution* components.
- */
-public class ArchivedJobGenerationUtils {
- public static final ObjectMapper MAPPER = new ObjectMapper();
- public static final JsonFactory JACKSON_FACTORY = new JsonFactory()
- .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
- .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
-
- private static ArchivedExecutionGraph originalJob;
- private static ArchivedExecutionJobVertex originalTask;
- private static ArchivedExecutionVertex originalSubtask;
- private static ArchivedExecution originalAttempt;
-
- private static final Object lock = new Object();
-
- private ArchivedJobGenerationUtils() {
- }
-
- public static AccessExecutionGraph getTestJob() throws Exception {
- synchronized (lock) {
- if (originalJob == null) {
- generateArchivedJob();
- }
- }
- return originalJob;
- }
-
- public static AccessExecutionJobVertex getTestTask() throws Exception {
- synchronized (lock) {
- if (originalJob == null) {
- generateArchivedJob();
- }
- }
- return originalTask;
- }
-
- public static AccessExecutionVertex getTestSubtask() throws Exception {
- synchronized (lock) {
- if (originalJob == null) {
- generateArchivedJob();
- }
- }
- return originalSubtask;
- }
-
- public static AccessExecution getTestAttempt() throws Exception {
- synchronized (lock) {
- if (originalJob == null) {
- generateArchivedJob();
- }
- }
- return originalAttempt;
- }
-
- private static void generateArchivedJob() throws Exception {
- // Attempt
- StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1");
- StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2");
- TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234);
- originalAttempt = new ArchivedExecutionBuilder()
- .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
- .setParallelSubtaskIndex(1)
- .setAttemptNumber(0)
- .setAssignedResourceLocation(location)
- .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
- .setState(ExecutionState.FINISHED)
- .setFailureCause("attemptException")
- .build();
- // Subtask
- originalSubtask = new ArchivedExecutionVertexBuilder()
- .setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
- .setTaskNameWithSubtask("hello(1/1)")
- .setCurrentExecution(originalAttempt)
- .build();
- // Task
- originalTask = new ArchivedExecutionJobVertexBuilder()
- .setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask})
- .build();
- // Job
- Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
- tasks.put(originalTask.getJobVertexId(), originalTask);
- originalJob = new ArchivedExecutionGraphBuilder()
- .setJobID(new JobID())
- .setTasks(tasks)
- .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
- .setState(JobStatus.FINISHED)
- .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
- .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
- .build();
- }
-
- // ========================================================================
- // utility methods
- // ========================================================================
-
- public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) {
- assertEquals(expectedAccs.length, writtenAccs.size());
- for (int x = 0; x < expectedAccs.length; x++) {
- JsonNode acc = writtenAccs.get(x);
-
- assertEquals(expectedAccs[x].getName(), acc.get("name").asText());
- assertEquals(expectedAccs[x].getType(), acc.get("type").asText());
- assertEquals(expectedAccs[x].getValue(), acc.get("value").asText());
- }
- }
-
- public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) {
- assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong());
- assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong());
- assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong());
- assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
new file mode 100644
index 0000000..4ad1759
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+/**
+ * A special exception that indicates that an element was not found and that the
+ * request should be answered with a {@code 404} return code.
+ */
+public class NotFoundException extends Exception {
+
+ private static final long serialVersionUID = -4036006746423754639L;
+
+ public NotFoundException(String message) {
+ super(message);
+ }
+}