You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2015/09/11 19:37:13 UTC
tez git commit: TEZ-2792. Add AM web service API for tasks (sree)
Repository: tez
Updated Branches:
refs/heads/master b288be709 -> 37cab1284
TEZ-2792. Add AM web service API for tasks (sree)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/37cab128
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/37cab128
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/37cab128
Branch: refs/heads/master
Commit: 37cab12849e4cc758d13ae2642c92b1999269827
Parents: b288be7
Author: Sreenath Somarajapuram <sr...@apache.org>
Authored: Fri Sep 11 22:53:03 2015 +0530
Committer: Sreenath Somarajapuram <sr...@apache.org>
Committed: Fri Sep 11 22:53:03 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/web/AMWebController.java | 206 ++++++++++++++++-
.../apache/tez/dag/app/web/WebUIService.java | 49 +++-
.../tez/dag/app/web/TestAMWebController.java | 231 +++++++++++++++++++
4 files changed, 482 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c373fe7..de6ad13 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -175,6 +175,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2792. Add AM web service API for tasks
TEZ-2807. Log data in the finish event instead of the start event
TEZ-2766. Tez UI: Add vertex in-progress info in DAG details
TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index db27d59..06f282c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -28,12 +28,15 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.StringTokenizer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.records.TezTaskID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -67,7 +70,7 @@ public class AMWebController extends Controller {
static final String VERTEX_PROGRESS = "vertexProgress";
static final String VERTEX_PROGRESSES = "vertexProgresses";
- static final int MAX_VERTICES_QUERIED = 100;
+ static final int MAX_QUERIED = 100;
public static final String VERSION = "2";
private AppContext appContext;
@@ -266,7 +269,7 @@ public class AMWebController extends Controller {
List<Integer> vertexIDs = new ArrayList<Integer>();
try {
dagID = getQueryParamInt(WebUIService.DAG_ID);
- for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_VERTICES_QUERIED)) {
+ for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_QUERIED)) {
vertexIDs.add(Integer.parseInt(vertexIDStr));
}
} catch (NumberFormatException e) {
@@ -342,7 +345,7 @@ public class AMWebController extends Controller {
List<Integer> vertexIDs = new ArrayList<>();
if (!valueStr.equals("")) {
- String[] vertexIdsStr = valueStr.split(",", MAX_VERTICES_QUERIED);
+ String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED);
try {
for (String vertexIdStr : vertexIdsStr) {
@@ -359,6 +362,86 @@ public class AMWebController extends Controller {
return vertexIDs;
}
+ List<String> splitString(String str, String delimiter, Integer limit) {
+ List<String> items = new ArrayList<>();
+
+ StringTokenizer tokenizer = new StringTokenizer(str, delimiter);
+ for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++) {
+ items.add(tokenizer.nextToken());
+ }
+
+ return items;
+ }
+
+ /**
+ * getIntegersFromRequest
+ * Parses a query parameter with comma separated values and returns an array of integers.
+ * The function returns null if any of the value is not an integer
+ *
+ * @param paramName {String}
+ * @param limit {Integer} Maximum number of values to be taken
+ *
+ * @return {List<Integer>} List of parsed values
+ */
+ List<Integer> getIntegersFromRequest(String paramName, Integer limit) {
+ String valuesStr = $(paramName).trim();
+
+ List<Integer> values = new ArrayList<>();
+ if (!valuesStr.equals("")) {
+ try {
+ for (String valueStr : splitString(valuesStr, ",", limit)) {
+ int value = Integer.parseInt(valueStr);
+ values.add(value);
+ }
+ } catch (NumberFormatException nfe) {
+ sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
+ String.format("invalid %s passed in as parameter", paramName), nfe);
+ values = null;
+ }
+ }
+
+ return values;
+ }
+
+ /**
+ * getIDsFromRequest
+ * Takes in "1_0,1_3" and returns [[1,0],[1,3]]
+ * Mainly to parse a query parameter with comma separated indexes. For vertex its the index,
+ * for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo
+ * The function returns null if any of the value is not an integer
+ *
+ * @param paramName {String}
+ * @param limit {Integer} Maximum number of values to be taken
+ *
+ * @return {List<List<Integer>>} List of parsed values
+ */
+ List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) {
+ String valuesStr = $(paramName).trim();
+
+ List<List<Integer>> values = new ArrayList<>();
+ if (!valuesStr.equals("")) {
+ try {
+ for (String valueStr : splitString(valuesStr, ",", limit)) {
+ List<Integer> innerValues = new ArrayList<>();
+ String innerValueStrs[] = valueStr.split("_");
+ if(innerValueStrs.length == 2) {
+ for (String innerValueStr : innerValueStrs) {
+ int value = Integer.parseInt(innerValueStr);
+ innerValues.add(value);
+ }
+ values.add(innerValues);
+ }
+ }
+ } catch (NumberFormatException nfe) {
+ sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
+ String.format("invalid %s passed in as parameter", paramName), nfe);
+ values = null;
+ }
+ }
+
+ return values;
+ }
+
public void getDagInfo() {
if (!setupResponse()) {
return;
@@ -413,7 +496,7 @@ public class AMWebController extends Controller {
}
Collection<Vertex> vertexList;
- if (requestedIDs.size() == 0) {
+ if (requestedIDs.isEmpty()) {
// no ids specified return all.
vertexList = dag.getVertices().values();
} else {
@@ -430,6 +513,121 @@ public class AMWebController extends Controller {
));
}
+ Vertex getVertexFromIndex(DAG dag, Integer vertexIndex) {
+ final TezVertexID tezVertexID = TezVertexID.getInstance(dag.getID(), vertexIndex);
+ Vertex vertex = dag.getVertex(tezVertexID);
+ return vertex;
+ }
+
+ /**
+ * getRequestedTasks
+ * Heart of getTasksInfo. Given a dag and a limit, based on the incoming query parameters
+ * returns a list of task instances
+ *
+ * @param dag {DAG}
+ * @param limit {Integer}
+ */
+ List<Task> getRequestedTasks(DAG dag, Integer limit) {
+ List<Task> tasks = new ArrayList<>();
+
+ List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit);
+ if(taskIDs == null) {
+ return null;
+ }
+ else if(!taskIDs.isEmpty()) {
+ for (List<Integer> indexes : taskIDs) {
+ Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
+ if(vertex == null) {
+ continue;
+ }
+ Task task = vertex.getTask(indexes.get(1));
+ if(task == null) {
+ continue;
+ }
+ else {
+ tasks.add(task);
+ }
+
+ if(tasks.size() >= limit) {
+ break;
+ }
+ }
+ }
+ else {
+ List<Integer> vertexIDs = getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
+ if(vertexIDs == null) {
+ return null;
+ }
+ else if(!vertexIDs.isEmpty()) {
+ for (Integer vertexID : vertexIDs) {
+ Vertex vertex = getVertexFromIndex(dag, vertexID);
+ if(vertex == null) {
+ continue;
+ }
+ List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+ tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
+
+ if(tasks.size() >= limit) {
+ break;
+ }
+ }
+ }
+ else {
+ Collection<Vertex> vertices = dag.getVertices().values();
+ for (Vertex vertex : vertices) {
+ List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+ tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
+
+ if(tasks.size() >= limit) {
+ break;
+ }
+ }
+ }
+ }
+
+ return tasks;
+ }
+
+ /**
+ * Renders the response JSON for tasksInfo API
+ * The JSON will have an array of task objects under the key tasks.
+ */
+ public void getTasksInfo() {
+ if (!setupResponse()) {
+ return;
+ }
+
+ DAG dag = checkAndGetDAGFromRequest();
+ if (dag == null) {
+ return;
+ }
+
+ int limit = MAX_QUERIED;
+ try {
+ limit = getQueryParamInt(WebUIService.LIMIT);
+ } catch (NumberFormatException e) {
+ //Ignore
+ }
+
+ List<Task> tasks = getRequestedTasks(dag, limit);
+ if(tasks == null) {
+ return;
+ }
+
+ ArrayList<Map<String, String>> tasksInfo = new ArrayList<>();
+ for(Task t : tasks) {
+ Map<String, String> taskInfo = new HashMap<>();
+ taskInfo.put("id", t.getTaskId().toString());
+ taskInfo.put("progress", Float.toString(t.getProgress()));
+ taskInfo.put("status", t.getState().toString());
+ tasksInfo.add(taskInfo);
+ }
+
+ renderJSON(ImmutableMap.of(
+ "tasks", tasksInfo
+ ));
+ }
+
@Override
@VisibleForTesting
public void renderJSON(Object object) {
http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
index 19e1641..32b57e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
@@ -40,6 +40,9 @@ public class WebUIService extends AbstractService {
private static final String WS_PREFIX_V2 = "/ui/ws/v2/tez/";
public static final String VERTEX_ID = "vertexID";
public static final String DAG_ID = "dagID";
+ public static final String TASK_ID = "taskID";
+
+ public static final String LIMIT = "limit";
private static final Logger LOG = LoggerFactory.getLogger(WebUIService.class);
@@ -152,9 +155,53 @@ public class WebUIService extends AbstractService {
route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class,
"getVertexProgresses");
- // v2 api
+ /**
+ * AM Web Service API V2
+ * The API facilitates end points that would serve the user with real-time data on dag,
+ * vertex and tasks.
+ *
+ * Query Params:
+ * dagID - Same as dagIndex. Expects one single value. (Its mandatory in all APIs)
+ * vertexID - Same as vertex index. Can be a list of comma separated values
+ * taskID - Should be of the format <vertexIndex>_<taskIndex>. For instance task with
+ * index 5 in vertex 3 can be referenced using the id 3_5
+ * limit - The max number of records returned. Currently supported only in tasksInfo.
+ * If not passed, limit would be taken as 100
+ *
+ * APIs:
+ * /ui/ws/v2/tez/dagInfo
+ * Query param:
+ * - Accepts one single parameter, dagID
+ * Data returned:
+ * - Full id, progress, status
+ *
+ * /ui/ws/v2/tez/verticesInfo
+ * Query params:
+ * - Accepts dagID and vertexID
+ * - vertexID is optional
+ * - If specified the respective vertices will be returned, else all vertices
+ * in the DAG will be returned
+ * Data returned:
+ * - Full id, progress, status, totalTasks, runningTasks, succeededTasks
+ * failedTaskAttempts, killedTaskAttempts
+ *
+ * /ui/ws/v2/tez/tasksInfo
+ * Query params:
+ * - Accepts dagID, vertexID, taskID & limit
+ * - vertex and task IDs are optional
+ * - If taskID is passed: All (capped by limit) the specified tasks will be
+ * returned. vertexID if present wont be considered
+ * - IF vertexID is passed: All (capped by limit) tasks under the vertices
+ * will be returned
+ * - If just dagID is passed: All (capped by limit) tasks under the DAG
+ * will be returned
+ * Data returned:
+ * - Full id, progress, status
+ */
route(WS_PREFIX_V2 + pajoin("dagInfo", DAG_ID), AMWebController.class, "getDagInfo");
route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class, "getVerticesInfo");
+ route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID), AMWebController.class,
+ "getTasksInfo");
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
index 62779bc..cba3c3e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -33,23 +33,30 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Before;
@@ -367,4 +374,228 @@ public class TestAMWebController {
vertex2Result.get("failedTaskAttempts"));
}
+ //-- Get Tasks Info Tests -----------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGetTasksInfoWithTaskIds() {
+ List <Task> tasks = createMockTasks();
+ List <Integer> vertexMinIds = Arrays.asList();
+ List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0),
+ Arrays.asList(0, 3),
+ Arrays.asList(0, 1));
+
+ // Fetch All
+ Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+ AMWebController.MAX_QUERIED);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+ get("tasks");
+ Assert.assertEquals(3, tasksInfo.size());
+
+ verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+ verifySingleTaskResult(tasks.get(3), tasksInfo.get(1));
+ verifySingleTaskResult(tasks.get(1), tasksInfo.get(2));
+
+ // With limit
+ result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+ Assert.assertEquals(2, tasksInfo.size());
+
+ verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+ verifySingleTaskResult(tasks.get(3), tasksInfo.get(1));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGetTasksInfoGracefulTaskFetch() {
+ List <Task> tasks = createMockTasks();
+ List <Integer> vertexMinIds = Arrays.asList();
+ List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0),
+ Arrays.asList(0, 6),
+ Arrays.asList(0, 1));
+
+ // Fetch All
+ Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+ AMWebController.MAX_QUERIED);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+ get("tasks");
+ Assert.assertEquals(2, tasksInfo.size());
+
+ verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+ verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGetTasksInfoWithVertexId() {
+ List <Task> tasks = createMockTasks();
+ List <Integer> vertexMinIds = Arrays.asList(0);
+ List <List <Integer>> taskMinIds = Arrays.asList();
+
+ Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+ AMWebController.MAX_QUERIED);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+ get("tasks");
+ Assert.assertEquals(4, tasksInfo.size());
+
+ sortMapList(tasksInfo, "id");
+ verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+ verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+ verifySingleTaskResult(tasks.get(2), tasksInfo.get(2));
+ verifySingleTaskResult(tasks.get(3), tasksInfo.get(3));
+
+ // With limit
+ result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+ Assert.assertEquals(2, tasksInfo.size());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGetTasksInfoWithJustDAGId() {
+ List <Task> tasks = createMockTasks();
+ List <Integer> vertexMinIds = Arrays.asList();
+ List <List <Integer>> taskMinIds = Arrays.asList();
+
+ Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+ AMWebController.MAX_QUERIED);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+ get("tasks");
+ Assert.assertEquals(4, tasksInfo.size());
+
+ sortMapList(tasksInfo, "id");
+ verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+ verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+ verifySingleTaskResult(tasks.get(2), tasksInfo.get(2));
+ verifySingleTaskResult(tasks.get(3), tasksInfo.get(3));
+
+ // With limit
+ result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("tasks"));
+
+ tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+ Assert.assertEquals(2, tasksInfo.size());
+ }
+
+ private void sortMapList(ArrayList<Map<String, String>> list, String propertyName) {
+ class MapComparator implements Comparator<Map<String, String>> {
+ private final String key;
+
+ public MapComparator(String key) {
+ this.key = key;
+ }
+
+ public int compare(Map<String, String> first, Map<String, String> second) {
+ String firstValue = first.get(key);
+ String secondValue = second.get(key);
+ return firstValue.compareTo(secondValue);
+ }
+ }
+
+ Collections.sort(list, new MapComparator(propertyName));
+ }
+
+ Map<String, Object> getTasksTestHelper(List<Task> tasks, List <List <Integer>> taskMinIds,
+ List<Integer> vertexMinIds, Integer limit) {
+ //Creating mock DAG
+ DAG mockDAG = mock(DAG.class);
+ doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID();
+
+ //Creating mock vertex and attaching to mock DAG
+ TezVertexID vertexID = TezVertexID.fromString("vertex_1441301219877_0109_1_00");
+ Vertex mockVertex = mock(Vertex.class);
+ doReturn(vertexID).when(mockVertex).getVertexId();
+
+ doReturn(mockVertex).when(mockDAG).getVertex(vertexID);
+ doReturn(ImmutableMap.of(
+ vertexID, mockVertex
+ )).when(mockDAG).getVertices();
+
+ //Creating mock tasks and attaching to mock vertex
+ Map<TezTaskID, Task> taskMap = Maps.newHashMap();
+ for(Task task : tasks) {
+ TezTaskID taskId = task.getTaskId();
+ int taskIndex = taskId.getId();
+ doReturn(task).when(mockVertex).getTask(taskIndex);
+ taskMap.put(taskId, task);
+ }
+ doReturn(taskMap).when(mockVertex).getTasks();
+
+ //Creates & setup controller spy
+ AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+ doReturn(true).when(spy).setupResponse();
+ doNothing().when(spy).renderJSON(any());
+
+ // Set mock query params
+ doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
+ doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
+ doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit);
+
+ // Set function mocks
+ doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+
+ spy.getTasksInfo();
+ verify(spy).renderJSON(returnResultCaptor.capture());
+
+ return returnResultCaptor.getValue();
+ }
+
+ private List<Task> createMockTasks() {
+ Task mockTask1 = createMockTask("task_1441301219877_0109_1_00_000000", TaskState.RUNNING,
+ 0.33f);
+ Task mockTask2 = createMockTask("task_1441301219877_0109_1_00_000001", TaskState.SUCCEEDED,
+ 1.0f);
+ Task mockTask3 = createMockTask("task_1441301219877_0109_1_00_000002", TaskState.SUCCEEDED,
+ .8f);
+ Task mockTask4 = createMockTask("task_1441301219877_0109_1_00_000003", TaskState.SUCCEEDED,
+ .8f);
+
+ List <Task> tasks = Arrays.asList(mockTask1, mockTask2, mockTask3, mockTask4);
+ return tasks;
+ }
+
+ private Task createMockTask(String taskIDStr, TaskState status, float progress) {
+ Task mockTask = mock(Task.class);
+
+ doReturn(TezTaskID.fromString(taskIDStr)).when(mockTask).getTaskId();
+ doReturn(status).when(mockTask).getState();
+ doReturn(progress).when(mockTask).getProgress();
+
+ return mockTask;
+ }
+
+ private void verifySingleTaskResult(Task mockTask, Map<String, String> taskResult) {
+ Assert.assertEquals(3, taskResult.size());
+ Assert.assertEquals(mockTask.getTaskId().toString(), taskResult.get("id"));
+ Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status"));
+ Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress"));
+ }
}