You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2015/09/11 19:37:13 UTC

tez git commit: TEZ-2792. Add AM web service API for tasks (sree)

Repository: tez
Updated Branches:
  refs/heads/master b288be709 -> 37cab1284


TEZ-2792. Add AM web service API for tasks (sree)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/37cab128
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/37cab128
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/37cab128

Branch: refs/heads/master
Commit: 37cab12849e4cc758d13ae2642c92b1999269827
Parents: b288be7
Author: Sreenath Somarajapuram <sr...@apache.org>
Authored: Fri Sep 11 22:53:03 2015 +0530
Committer: Sreenath Somarajapuram <sr...@apache.org>
Committed: Fri Sep 11 22:53:03 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/web/AMWebController.java | 206 ++++++++++++++++-
 .../apache/tez/dag/app/web/WebUIService.java    |  49 +++-
 .../tez/dag/app/web/TestAMWebController.java    | 231 +++++++++++++++++++
 4 files changed, 482 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c373fe7..de6ad13 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -175,6 +175,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2792. Add AM web service API for tasks
   TEZ-2807. Log data in the finish event instead of the start event
   TEZ-2766. Tez UI: Add vertex in-progress info in DAG details
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting

http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index db27d59..06f282c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -28,12 +28,15 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.records.TezTaskID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -67,7 +70,7 @@ public class AMWebController extends Controller {
   static final String VERTEX_PROGRESS = "vertexProgress";
   static final String VERTEX_PROGRESSES = "vertexProgresses";
 
-  static final int MAX_VERTICES_QUERIED = 100;
+  static final int MAX_QUERIED = 100;
   public static final String VERSION = "2";
 
   private AppContext appContext;
@@ -266,7 +269,7 @@ public class AMWebController extends Controller {
     List<Integer> vertexIDs = new ArrayList<Integer>();
     try {
       dagID = getQueryParamInt(WebUIService.DAG_ID);
-      for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_VERTICES_QUERIED)) {
+      for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_QUERIED)) {
         vertexIDs.add(Integer.parseInt(vertexIDStr));
       }
     } catch (NumberFormatException e) {
@@ -342,7 +345,7 @@ public class AMWebController extends Controller {
 
     List<Integer> vertexIDs = new ArrayList<>();
     if (!valueStr.equals("")) {
-      String[] vertexIdsStr = valueStr.split(",", MAX_VERTICES_QUERIED);
+      String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED);
 
       try {
         for (String vertexIdStr : vertexIdsStr) {
@@ -359,6 +362,86 @@ public class AMWebController extends Controller {
     return vertexIDs;
   }
 
+  List<String> splitString(String str, String delimiter, Integer limit) {
+    List<String> items = new ArrayList<>();
+
+    StringTokenizer tokenizer = new StringTokenizer(str, delimiter);
+    for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++) {
+      items.add(tokenizer.nextToken());
+    }
+
+    return items;
+  }
+
+  /**
+   * getIntegersFromRequest
+   * Parses a query parameter with comma separated values and returns an array of integers.
+   * The function returns null if any of the value is not an integer
+   *
+   * @param paramName {String}
+   * @param limit {Integer} Maximum number of values to be taken
+   *
+   * @return {List<Integer>} List of parsed values
+   */
+  List<Integer> getIntegersFromRequest(String paramName, Integer limit) {
+    String valuesStr = $(paramName).trim();
+
+    List<Integer> values = new ArrayList<>();
+    if (!valuesStr.equals("")) {
+      try {
+        for (String valueStr : splitString(valuesStr, ",", limit)) {
+          int value = Integer.parseInt(valueStr);
+          values.add(value);
+        }
+      } catch (NumberFormatException nfe) {
+        sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
+            String.format("invalid %s passed in as parameter", paramName), nfe);
+        values = null;
+      }
+    }
+
+    return values;
+  }
+
+  /**
+   * getIDsFromRequest
+   * Takes in "1_0,1_3" and returns [[1,0],[1,3]]
+   * Mainly to parse a query parameter with comma separated indexes. For vertex its the index,
+   * for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo
+   * The function returns null if any of the value is not an integer
+   *
+   * @param paramName {String}
+   * @param limit {Integer} Maximum number of values to be taken
+   *
+   * @return {List<List<Integer>>} List of parsed values
+   */
+  List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) {
+    String valuesStr = $(paramName).trim();
+
+    List<List<Integer>> values = new ArrayList<>();
+    if (!valuesStr.equals("")) {
+      try {
+        for (String valueStr : splitString(valuesStr, ",", limit)) {
+          List<Integer> innerValues = new ArrayList<>();
+          String innerValueStrs[] = valueStr.split("_");
+          if(innerValueStrs.length == 2) {
+            for (String innerValueStr : innerValueStrs) {
+              int value = Integer.parseInt(innerValueStr);
+              innerValues.add(value);
+            }
+            values.add(innerValues);
+          }
+        }
+      } catch (NumberFormatException nfe) {
+        sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
+            String.format("invalid %s passed in as parameter", paramName), nfe);
+        values = null;
+      }
+    }
+
+    return values;
+  }
+
   public void getDagInfo() {
     if (!setupResponse()) {
       return;
@@ -413,7 +496,7 @@ public class AMWebController extends Controller {
     }
 
     Collection<Vertex> vertexList;
-    if (requestedIDs.size() == 0) {
+    if (requestedIDs.isEmpty()) {
       // no ids specified return all.
       vertexList = dag.getVertices().values();
     } else {
@@ -430,6 +513,121 @@ public class AMWebController extends Controller {
     ));
   }
 
+  Vertex getVertexFromIndex(DAG dag, Integer vertexIndex) {
+    final TezVertexID tezVertexID = TezVertexID.getInstance(dag.getID(), vertexIndex);
+    Vertex vertex = dag.getVertex(tezVertexID);
+    return vertex;
+  }
+
+  /**
+   * getRequestedTasks
+   * Heart of getTasksInfo. Given a dag and a limit, based on the incoming query parameters
+   * returns a list of task instances
+   *
+   * @param dag {DAG}
+   * @param limit {Integer}
+   */
+  List<Task> getRequestedTasks(DAG dag, Integer limit) {
+    List<Task> tasks = new ArrayList<>();
+
+    List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit);
+    if(taskIDs == null) {
+      return null;
+    }
+    else if(!taskIDs.isEmpty()) {
+      for (List<Integer> indexes : taskIDs) {
+        Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
+        if(vertex == null) {
+          continue;
+        }
+        Task task = vertex.getTask(indexes.get(1));
+        if(task == null) {
+          continue;
+        }
+        else {
+          tasks.add(task);
+        }
+
+        if(tasks.size() >= limit) {
+          break;
+        }
+      }
+    }
+    else {
+      List<Integer> vertexIDs = getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
+      if(vertexIDs == null) {
+        return null;
+      }
+      else if(!vertexIDs.isEmpty()) {
+        for (Integer vertexID : vertexIDs) {
+          Vertex vertex = getVertexFromIndex(dag, vertexID);
+          if(vertex == null) {
+            continue;
+          }
+          List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+          tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
+
+          if(tasks.size() >= limit) {
+            break;
+          }
+        }
+      }
+      else {
+        Collection<Vertex> vertices = dag.getVertices().values();
+        for (Vertex vertex : vertices) {
+          List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+          tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
+
+          if(tasks.size() >= limit) {
+            break;
+          }
+        }
+      }
+    }
+
+    return tasks;
+  }
+
+  /**
+   * Renders the response JSON for tasksInfo API
+   * The JSON will have an array of task objects under the key tasks.
+   */
+  public void getTasksInfo() {
+    if (!setupResponse()) {
+      return;
+    }
+
+    DAG dag = checkAndGetDAGFromRequest();
+    if (dag == null) {
+      return;
+    }
+
+    int limit = MAX_QUERIED;
+    try {
+      limit = getQueryParamInt(WebUIService.LIMIT);
+    } catch (NumberFormatException e) {
+      //Ignore
+    }
+
+    List<Task> tasks = getRequestedTasks(dag, limit);
+    if(tasks == null) {
+      return;
+    }
+
+    ArrayList<Map<String, String>> tasksInfo = new ArrayList<>();
+    for(Task t : tasks) {
+      Map<String, String> taskInfo = new HashMap<>();
+      taskInfo.put("id", t.getTaskId().toString());
+      taskInfo.put("progress", Float.toString(t.getProgress()));
+      taskInfo.put("status", t.getState().toString());
+      tasksInfo.add(taskInfo);
+    }
+
+    renderJSON(ImmutableMap.of(
+      "tasks", tasksInfo
+    ));
+  }
+
   @Override
   @VisibleForTesting
   public void renderJSON(Object object) {

http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
index 19e1641..32b57e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
@@ -40,6 +40,9 @@ public class WebUIService extends AbstractService {
   private static final String WS_PREFIX_V2 = "/ui/ws/v2/tez/";
   public static final String VERTEX_ID = "vertexID";
   public static final String DAG_ID = "dagID";
+  public static final String TASK_ID = "taskID";
+
+  public static final String LIMIT = "limit";
 
   private static final Logger LOG = LoggerFactory.getLogger(WebUIService.class);
 
@@ -152,9 +155,53 @@ public class WebUIService extends AbstractService {
       route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class,
           "getVertexProgresses");
 
-      // v2 api
+      /**
+       *  AM Web Service API V2
+       *  The API facilitates end points that would serve the user with real-time data on dag,
+       *  vertex and tasks.
+       *
+       *  Query Params:
+       *    dagID    - Same as dagIndex. Expects one single value. (Its mandatory in all APIs)
+       *    vertexID - Same as vertex index. Can be a list of comma separated values
+       *    taskID   - Should be of the format <vertexIndex>_<taskIndex>. For instance task with
+       *               index 5 in vertex 3 can be referenced using the id 3_5
+       *    limit    - The max number of records returned. Currently supported only in tasksInfo.
+       *               If not passed, limit would be taken as 100
+       *
+       *  APIs:
+       *    /ui/ws/v2/tez/dagInfo
+       *      Query param:
+       *        - Accepts one single parameter, dagID
+       *      Data returned:
+       *        - Full id, progress, status
+       *
+       *    /ui/ws/v2/tez/verticesInfo
+       *      Query params:
+       *        - Accepts dagID and vertexID
+       *        - vertexID is optional
+       *        - If specified the respective vertices will be returned, else all vertices
+       *          in the DAG will be returned
+       *      Data returned:
+       *        - Full id, progress, status, totalTasks, runningTasks, succeededTasks
+       *          failedTaskAttempts, killedTaskAttempts
+       *
+       *    /ui/ws/v2/tez/tasksInfo
+       *      Query params:
+       *        - Accepts dagID, vertexID, taskID & limit
+       *        - vertex and task IDs are optional
+       *        - If taskID is passed: All (capped by limit) the specified tasks will be
+       *          returned. vertexID if present wont be considered
+       *        - IF vertexID is passed: All (capped by limit) tasks under the vertices
+       *          will be returned
+       *        - If just dagID is passed: All (capped by limit) tasks under the DAG
+       *          will be returned
+       *      Data returned:
+       *        - Full id, progress, status
+       */
       route(WS_PREFIX_V2 + pajoin("dagInfo", DAG_ID), AMWebController.class, "getDagInfo");
       route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class, "getVerticesInfo");
+      route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID), AMWebController.class,
+          "getTasksInfo");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
index 62779bc..cba3c3e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -33,23 +33,30 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.webapp.Controller;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Before;
@@ -367,4 +374,228 @@ public class TestAMWebController {
         vertex2Result.get("failedTaskAttempts"));
   }
 
+  //-- Get Tasks Info Tests -----------------------------------------------------------------------
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGetTasksInfoWithTaskIds() {
+    List <Task> tasks = createMockTasks();
+    List <Integer> vertexMinIds = Arrays.asList();
+    List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0),
+        Arrays.asList(0, 3),
+        Arrays.asList(0, 1));
+
+    // Fetch All
+    Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+        AMWebController.MAX_QUERIED);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+        get("tasks");
+    Assert.assertEquals(3, tasksInfo.size());
+
+    verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+    verifySingleTaskResult(tasks.get(3), tasksInfo.get(1));
+    verifySingleTaskResult(tasks.get(1), tasksInfo.get(2));
+
+    // With limit
+    result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+    Assert.assertEquals(2, tasksInfo.size());
+
+    verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+    verifySingleTaskResult(tasks.get(3), tasksInfo.get(1));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGetTasksInfoGracefulTaskFetch() {
+    List <Task> tasks = createMockTasks();
+    List <Integer> vertexMinIds = Arrays.asList();
+    List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0),
+        Arrays.asList(0, 6),
+        Arrays.asList(0, 1));
+
+    // Fetch All
+    Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+        AMWebController.MAX_QUERIED);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+        get("tasks");
+    Assert.assertEquals(2, tasksInfo.size());
+
+    verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+    verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGetTasksInfoWithVertexId() {
+    List <Task> tasks = createMockTasks();
+    List <Integer> vertexMinIds = Arrays.asList(0);
+    List <List <Integer>> taskMinIds = Arrays.asList();
+
+    Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+        AMWebController.MAX_QUERIED);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+        get("tasks");
+    Assert.assertEquals(4, tasksInfo.size());
+
+    sortMapList(tasksInfo, "id");
+    verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+    verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+    verifySingleTaskResult(tasks.get(2), tasksInfo.get(2));
+    verifySingleTaskResult(tasks.get(3), tasksInfo.get(3));
+
+    // With limit
+    result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+    Assert.assertEquals(2, tasksInfo.size());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGetTasksInfoWithJustDAGId() {
+    List <Task> tasks = createMockTasks();
+    List <Integer> vertexMinIds = Arrays.asList();
+    List <List <Integer>> taskMinIds = Arrays.asList();
+
+    Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds,
+        AMWebController.MAX_QUERIED);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result.
+        get("tasks");
+    Assert.assertEquals(4, tasksInfo.size());
+
+    sortMapList(tasksInfo, "id");
+    verifySingleTaskResult(tasks.get(0), tasksInfo.get(0));
+    verifySingleTaskResult(tasks.get(1), tasksInfo.get(1));
+    verifySingleTaskResult(tasks.get(2), tasksInfo.get(2));
+    verifySingleTaskResult(tasks.get(3), tasksInfo.get(3));
+
+    // With limit
+    result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("tasks"));
+
+    tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks");
+    Assert.assertEquals(2, tasksInfo.size());
+  }
+
+  private void sortMapList(ArrayList<Map<String, String>> list, String propertyName) {
+    class MapComparator implements Comparator<Map<String, String>> {
+      private final String key;
+
+      public MapComparator(String key) {
+        this.key = key;
+      }
+
+      public int compare(Map<String, String> first, Map<String, String> second) {
+        String firstValue = first.get(key);
+        String secondValue = second.get(key);
+        return firstValue.compareTo(secondValue);
+      }
+    }
+
+    Collections.sort(list, new MapComparator(propertyName));
+  }
+
+  Map<String, Object> getTasksTestHelper(List<Task> tasks, List <List <Integer>> taskMinIds,
+                                         List<Integer> vertexMinIds, Integer limit) {
+    //Creating mock DAG
+    DAG mockDAG = mock(DAG.class);
+    doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID();
+
+    //Creating mock vertex and attaching to mock DAG
+    TezVertexID vertexID = TezVertexID.fromString("vertex_1441301219877_0109_1_00");
+    Vertex mockVertex = mock(Vertex.class);
+    doReturn(vertexID).when(mockVertex).getVertexId();
+
+    doReturn(mockVertex).when(mockDAG).getVertex(vertexID);
+    doReturn(ImmutableMap.of(
+        vertexID, mockVertex
+    )).when(mockDAG).getVertices();
+
+    //Creating mock tasks and attaching to mock vertex
+    Map<TezTaskID, Task> taskMap = Maps.newHashMap();
+    for(Task task : tasks) {
+      TezTaskID taskId = task.getTaskId();
+      int taskIndex = taskId.getId();
+      doReturn(task).when(mockVertex).getTask(taskIndex);
+      taskMap.put(taskId, task);
+    }
+    doReturn(taskMap).when(mockVertex).getTasks();
+
+    //Creates & setup controller spy
+    AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+        "TEST_HISTORY_URL");
+    AMWebController spy = spy(amWebController);
+    doReturn(true).when(spy).setupResponse();
+    doNothing().when(spy).renderJSON(any());
+
+    // Set mock query params
+    doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
+    doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
+    doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit);
+
+    // Set function mocks
+    doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+
+    spy.getTasksInfo();
+    verify(spy).renderJSON(returnResultCaptor.capture());
+
+    return returnResultCaptor.getValue();
+  }
+
+  private List<Task> createMockTasks() {
+    Task mockTask1 = createMockTask("task_1441301219877_0109_1_00_000000", TaskState.RUNNING,
+        0.33f);
+    Task mockTask2 = createMockTask("task_1441301219877_0109_1_00_000001", TaskState.SUCCEEDED,
+        1.0f);
+    Task mockTask3 = createMockTask("task_1441301219877_0109_1_00_000002", TaskState.SUCCEEDED,
+        .8f);
+    Task mockTask4 = createMockTask("task_1441301219877_0109_1_00_000003", TaskState.SUCCEEDED,
+        .8f);
+
+    List <Task> tasks = Arrays.asList(mockTask1, mockTask2, mockTask3, mockTask4);
+    return tasks;
+  }
+
+  private Task createMockTask(String taskIDStr, TaskState status, float progress) {
+    Task mockTask = mock(Task.class);
+
+    doReturn(TezTaskID.fromString(taskIDStr)).when(mockTask).getTaskId();
+    doReturn(status).when(mockTask).getState();
+    doReturn(progress).when(mockTask).getProgress();
+
+    return mockTask;
+  }
+
+  private void verifySingleTaskResult(Task mockTask, Map<String, String> taskResult) {
+    Assert.assertEquals(3, taskResult.size());
+    Assert.assertEquals(mockTask.getTaskId().toString(), taskResult.get("id"));
+    Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status"));
+    Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress"));
+  }
 }