You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:42 UTC

[25/50] [abbrv] hadoop git commit: YARN-3864. Implement support for querying single app and all apps for a flow run (Varun Saxena via sjlee)

YARN-3864. Implement support for querying single app and all apps for a flow run (Varun Saxena via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b6784ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b6784ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b6784ab

Branch: refs/heads/feature-YARN-2928
Commit: 2b6784ab9585dc8dea97b0c99b51a64a4fee5c8d
Parents: 0839cd1
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Oct 5 13:14:11 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:58 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reader/TimelineReaderWebServices.java       | 206 +++++-
 .../storage/ApplicationEntityReader.java        |  64 +-
 .../storage/FlowActivityEntityReader.java       |  33 +-
 .../storage/FlowRunEntityReader.java            |   2 +-
 .../storage/GenericEntityReader.java            |  16 +-
 .../storage/TimelineEntityReader.java           |  21 +-
 .../storage/TimelineEntityReaderFactory.java    |   2 +-
 .../storage/application/ApplicationRowKey.java  |  34 +
 .../TestTimelineReaderWebServicesFlowRun.java   | 405 -----------
 ...stTimelineReaderWebServicesHBaseStorage.java | 673 +++++++++++++++++++
 11 files changed, 1000 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0d2537e..902d05e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -118,6 +118,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4210. HBase reader throws NPE if Get returns no rows (Varun Saxena
     via vrushali)
 
+    YARN-3864. Implement support for querying single app and all apps for a
+    flow run (Varun Saxena via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index a327099..610f74c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -286,7 +286,7 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("fields") String fields) {
     String url = req.getRequestURI() +
-        (null == req.getQueryString() ? "" :
+        (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
     UserGroupInformation callerUGI = getUser(req);
     LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -310,7 +310,7 @@ public class TimelineReaderWebServices {
           parseFieldsStr(fields, COMMA_DELIMITER));
     } catch (Exception e) {
       handleException(e, url, startTime,
-          "createdTime or modifiedTime start/end or limit or flowId");
+          "createdTime or modifiedTime start/end or limit or flowrunid");
     }
     long endTime = Time.monotonicNow();
     if (entities == null) {
@@ -360,7 +360,7 @@ public class TimelineReaderWebServices {
       @QueryParam("flowrunid") String flowRunId,
       @QueryParam("fields") String fields) {
     String url = req.getRequestURI() +
-        (null == req.getQueryString() ? "" :
+        (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
     UserGroupInformation callerUGI = getUser(req);
     LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -420,7 +420,7 @@ public class TimelineReaderWebServices {
       @QueryParam("userid") String userId,
       @QueryParam("fields") String fields) {
     String url = req.getRequestURI() +
-        (null == req.getQueryString() ? "" :
+        (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
     UserGroupInformation callerUGI = getUser(req);
     LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -477,7 +477,7 @@ public class TimelineReaderWebServices {
       @QueryParam("limit") String limit,
       @QueryParam("fields") String fields) {
     String url = req.getRequestURI() +
-        (null == req.getQueryString() ? "" :
+        (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
     UserGroupInformation callerUGI = getUser(req);
     LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -502,4 +502,200 @@ public class TimelineReaderWebServices {
         " (Took " + (endTime - startTime) + " ms.)");
     return entities;
   }
+
+  /**
+   * Return a single app for given app id. Cluster ID is not provided by
+   * client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/app/{appid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getApp(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("appid") String appId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    return getApp(req, res, null, appId, flowId, flowRunId, userId, fields);
+  }
+
+  /**
+   * Return a single app for given cluster id and app id.
+   */
+  @GET
+  @Path("/app/{clusterid}/{appid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getApp(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("appid") String appId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    String url = req.getRequestURI() +
+        (req.getQueryString() == null ? "" :
+            QUERY_STRING_SEP + req.getQueryString());
+    UserGroupInformation callerUGI = getUser(req);
+    LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
+    long startTime = Time.monotonicNow();
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    TimelineEntity entity = null;
+    try {
+      entity = timelineReaderManager.getEntity(
+          parseUser(callerUGI, userId), parseStr(clusterId),
+          parseStr(flowId), parseLongStr(flowRunId), parseStr(appId),
+          TimelineEntityType.YARN_APPLICATION.toString(), null,
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (Exception e) {
+      handleException(e, url, startTime, "flowrunid");
+    }
+    long endTime = Time.monotonicNow();
+    if (entity == null) {
+      LOG.info("Processed URL " + url + " but app not found" + " (Took " +
+          (endTime - startTime) + " ms.)");
+      throw new NotFoundException("App " + appId + " not found");
+    }
+    LOG.info("Processed URL " + url +
+        " (Took " + (endTime - startTime) + " ms.)");
+    return entity;
+  }
+
+  /**
+   * Return a list of apps for given flow id and flow run id. Cluster ID is not
+   * provided by client so default cluster ID has to be taken. If number of
+   * matching apps are more than the limit, most recent apps till the limit is
+   * reached, will be returned.
+   */
+  @GET
+  @Path("/flowrunapps/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowRunApps(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+      @QueryParam("fields") String fields) {
+    return getEntities(req, res, null, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+        flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
+
+  /**
+   * Return a list of apps for a given cluster id, flow id and flow run id. If
+   * number of matching apps are more than the limit, most recent apps till the
+   * limit is reached, will be returned.
+   */
+  @GET
+  @Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowRunApps(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+      @QueryParam("fields") String fields) {
+    return getEntities(req, res, clusterId, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+        flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
+
+  /**
+   * Return a list of apps for given flow id. Cluster ID is not provided by
+   * client so default cluster ID has to be taken. If number of matching apps
+   * are more than the limit, most recent apps till the limit is reached, will
+   * be returned.
+   */
+  @GET
+  @Path("/flowapps/{flowid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowApps(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("flowid") String flowId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+      @QueryParam("fields") String fields) {
+    return getEntities(req, res, null, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+        null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
+
+  /**
+   * Return a list of apps for a given cluster id and flow id. If number of
+   * matching apps are more than the limit, most recent apps till the limit is
+   * reached, will be returned.
+   */
+  @GET
+  @Path("/flowapps/{clusterid}/{flowid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowApps(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("flowid") String flowId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+      @QueryParam("fields") String fields) {
+    return getEntities(req, res, clusterId, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+        null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index d5b5d63..61954e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Timeline entity reader for application entities that are stored in the
  * application table.
@@ -57,7 +60,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
         createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
         relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, fieldsToRetrieve);
+        eventFilters, fieldsToRetrieve, true);
   }
 
   public ApplicationEntityReader(String userId, String clusterId,
@@ -86,10 +89,63 @@ class ApplicationEntityReader extends GenericEntityReader {
   }
 
   @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    } else {
+      Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (singleEntityRead) {
+      if (flowId == null || flowRunId == null) {
+        FlowContext context =
+            lookupFlowContext(clusterId, appId, hbaseConf, conn);
+        flowId = context.flowId;
+        flowRunId = context.flowRunId;
+      }
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+    if (!singleEntityRead) {
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (modifiedTimeBegin == null) {
+        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (modifiedTimeEnd == null) {
+        modifiedTimeEnd = DEFAULT_END_TIME;
+      }
+    }
+  }
+
+  @Override
   protected ResultScanner getResults(Configuration hbaseConf,
       Connection conn) throws IOException {
-    throw new UnsupportedOperationException(
-        "we don't support multiple apps query");
+    Scan scan = new Scan();
+    if (flowRunId != null) {
+      scan.setRowPrefixFilter(ApplicationRowKey.
+          getRowKeyPrefix(clusterId, userId, flowId, flowRunId));
+    } else {
+      scan.setRowPrefixFilter(ApplicationRowKey.
+          getRowKeyPrefix(clusterId, userId, flowId));
+    }
+    scan.setFilter(new PageFilter(limit));
+    return table.getResultScanner(hbaseConf, conn, scan);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
index e68ca17..70a0915 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -60,7 +58,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
     super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
         createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
         relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, fieldsToRetrieve);
+        eventFilters, fieldsToRetrieve, true);
   }
 
   public FlowActivityEntityReader(String userId, String clusterId,
@@ -78,35 +76,6 @@ class FlowActivityEntityReader extends TimelineEntityReader {
     return FLOW_ACTIVITY_TABLE;
   }
 
-  /**
-   * Since this is strictly sorted by the row key, it is sufficient to collect
-   * the first results as specified by the limit.
-   */
-  @Override
-  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
-      Connection conn) throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    ResultScanner results = getResults(hbaseConf, conn);
-    try {
-      for (Result result : results) {
-        TimelineEntity entity = parseEntity(result);
-        if (entity == null) {
-          continue;
-        }
-        entities.add(entity);
-        if (entities.size() == limit) {
-          break;
-        }
-      }
-      return entities;
-    } finally {
-      results.close();
-    }
-  }
-
   @Override
   protected void validateParams() {
     Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index b5d7ae5..90ce28f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -56,7 +56,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
     super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
         createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
         relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, fieldsToRetrieve);
+        eventFilters, fieldsToRetrieve, false);
   }
 
   public FlowRunEntityReader(String userId, String clusterId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 396a02b..42079d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -61,8 +61,8 @@ class GenericEntityReader extends TimelineEntityReader {
   private static final EntityTable ENTITY_TABLE = new EntityTable();
   private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
 
-  private static final long DEFAULT_BEGIN_TIME = 0L;
-  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+  protected static final long DEFAULT_BEGIN_TIME = 0L;
+  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
 
   /**
    * Used to look up the flow context.
@@ -76,11 +76,11 @@ class GenericEntityReader extends TimelineEntityReader {
       Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String> metricFilters, Set<String> eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
+      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
     super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
         createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
         relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, fieldsToRetrieve);
+        eventFilters, fieldsToRetrieve, sortedKeys);
   }
 
   public GenericEntityReader(String userId, String clusterId,
@@ -97,7 +97,7 @@ class GenericEntityReader extends TimelineEntityReader {
     return ENTITY_TABLE;
   }
 
-  private FlowContext lookupFlowContext(String clusterId, String appId,
+  protected FlowContext lookupFlowContext(String clusterId, String appId,
       Configuration hbaseConf, Connection conn) throws IOException {
     byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
     Get get = new Get(rowKey);
@@ -113,9 +113,9 @@ class GenericEntityReader extends TimelineEntityReader {
     }
   }
 
-  private static class FlowContext {
-    private final String flowId;
-    private final Long flowRunId;
+  protected static class FlowContext {
+    protected final String flowId;
+    protected final Long flowRunId;
     public FlowContext(String flowId, Long flowRunId) {
       this.flowId = flowId;
       this.flowRunId = flowRunId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
index 93be2db..d4a659c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -74,6 +74,14 @@ abstract class TimelineEntityReader {
   protected BaseTable<?> table;
 
   /**
+   * Specifies whether keys for this table are sorted in a manner where entities
+   * can be retrieved by created time. If true, it will be sufficient to collect
+   * the first results as specified by the limit. Otherwise all matched entities
+   * will be fetched and then limit applied.
+   */
+  private boolean sortedKeys = false;
+
+  /**
    * Instantiates a reader for multiple-entity reads.
    */
   protected TimelineEntityReader(String userId, String clusterId,
@@ -83,8 +91,9 @@ abstract class TimelineEntityReader {
       Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String> metricFilters, Set<String> eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
+      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
     this.singleEntityRead = false;
+    this.sortedKeys = sortedKeys;
     this.userId = userId;
     this.clusterId = clusterId;
     this.flowId = flowId;
@@ -162,8 +171,14 @@ abstract class TimelineEntityReader {
           continue;
         }
         entities.add(entity);
-        if (entities.size() > limit) {
-          entities.pollLast();
+        if (!sortedKeys) {
+          if (entities.size() > limit) {
+            entities.pollLast();
+          }
+        } else {
+          if (entities.size() == limit) {
+            break;
+          }
         }
       }
       return entities;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
index 4fdef40..f5341c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -91,7 +91,7 @@ class TimelineEntityReaderFactory {
           appId, entityType, limit, createdTimeBegin, createdTimeEnd,
           modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
           infoFilters, configFilters, metricFilters, eventFilters,
-          fieldsToRetrieve);
+          fieldsToRetrieve, false);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index e3b5a87..10e3c2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -61,6 +61,40 @@ public class ApplicationRowKey {
   }
 
   /**
+   * Constructs a row key prefix for the application table as follows:
+   * {@code clusterId!userName!flowId!}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId) {
+    byte[] first = Bytes.toBytes(
+        Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
+    return Separator.QUALIFIERS.join(first, new byte[0]);
+  }
+
+  /**
+   * Constructs a row key prefix for the application table as follows:
+   * {@code clusterId!userName!flowId!flowRunId!}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId, Long flowRunId) {
+    byte[] first = Bytes.toBytes(
+        Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    return Separator.QUALIFIERS.join(first, second, new byte[0]);
+  }
+
+  /**
    * Constructs a row key for the application table as follows:
    * {@code clusterId!userName!flowId!flowRunId!AppId}
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
deleted file mode 100644
index e359f78..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.reader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-
-public class TestTimelineReaderWebServicesFlowRun {
-  private int serverPort;
-  private TimelineReaderServer server;
-  private static HBaseTestingUtility util;
-  private static long ts = System.currentTimeMillis();
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    util = new HBaseTestingUtility();
-    Configuration conf = util.getConfiguration();
-    conf.setInt("hfile.format.version", 3);
-    util.startMiniCluster();
-    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
-    loadData();
-  }
-
-  private static void loadData() throws Exception {
-    String cluster = "cluster1";
-    String user = "user1";
-    String flow = "flow_name";
-    String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
-    Long runid1 = 1002345678920L;
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entity = new TimelineEntity();
-    String id = "flowRunMetrics_test";
-    String type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity.setId(id);
-    entity.setType(type);
-    Long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId("MAP_SLOT_MILLIS");
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    metricValues.put(ts - 100000, 2);
-    metricValues.put(ts - 80000, 40);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-
-    m1 = new TimelineMetric();
-    m1.setId("HDFS_BYTES_READ");
-    metricValues = new HashMap<Long, Number>();
-    metricValues.put(ts - 100000, 31);
-    metricValues.put(ts - 80000, 57);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    entity.addMetrics(metrics);
-
-    TimelineEvent event = new TimelineEvent();
-    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
-    String expKey = "foo_event";
-    Object expVal = "test";
-    event.addInfo(expKey, expVal);
-    entity.addEvent(event);
-    te.addEntity(entity);
-
-    // write another application with same metric to this flow
-    TimelineEntities te1 = new TimelineEntities();
-    TimelineEntity entity1 = new TimelineEntity();
-    id = "flowRunMetrics_test";
-    type = TimelineEntityType.YARN_APPLICATION.toString();
-    entity1.setId(id);
-    entity1.setType(type);
-    cTime = 1425016501000L;
-    entity1.setCreatedTime(cTime);
-    // add metrics
-    metrics.clear();
-    TimelineMetric m2 = new TimelineMetric();
-    m2.setId("MAP_SLOT_MILLIS");
-    metricValues = new HashMap<Long, Number>();
-    metricValues.put(ts - 100000, 5L);
-    metricValues.put(ts - 80000, 101L);
-    m2.setType(Type.TIME_SERIES);
-    m2.setValues(metricValues);
-    metrics.add(m2);
-    entity1.addMetrics(metrics);
-    te1.addEntity(entity1);
-
-    String flow2 = "flow_name2";
-    String flowVersion2 = "CF7022C10F1454";
-    Long runid2 = 2102356789046L;
-    TimelineEntities te3 = new TimelineEntities();
-    TimelineEntity entity3 = new TimelineEntity();
-    id = "flowRunMetrics_test1";
-    entity3.setId(id);
-    entity3.setType(type);
-    cTime = 1425016501030L;
-    entity3.setCreatedTime(cTime);
-    TimelineEvent event2 = new TimelineEvent();
-    event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event2.setTimestamp(1436512802030L);
-    event2.addInfo("foo_event", "test");
-    entity3.addEvent(event2);
-    te3.addEntity(entity3);
-
-    HBaseTimelineWriterImpl hbi = null;
-    Configuration c1 = util.getConfiguration();
-    try {
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
-      hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
-      appName = "application_11111111111111_2223";
-      hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
-      hbi.flush();
-    } finally {
-      hbi.close();
-    }
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    util.shutdownMiniCluster();
-  }
-
-  @Before
-  public void init() throws Exception {
-    try {
-      Configuration config = util.getConfiguration();
-      config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          "localhost:0");
-      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
-      config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
-          "org.apache.hadoop.yarn.server.timelineservice.storage." +
-              "HBaseTimelineReaderImpl");
-      config.setInt("hfile.format.version", 3);
-      server = new TimelineReaderServer();
-      server.init(config);
-      server.start();
-      serverPort = server.getWebServerPort();
-    } catch (Exception e) {
-      Assert.fail("Web server failed to start");
-    }
-  }
-
-  private static Client createClient() {
-    ClientConfig cfg = new DefaultClientConfig();
-    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
-    return new Client(new URLConnectionClientHandler(
-        new DummyURLConnectionFactory()), cfg);
-  }
-
-  private static ClientResponse getResponse(Client client, URI uri)
-      throws Exception {
-    ClientResponse resp =
-        client.resource(uri).accept(MediaType.APPLICATION_JSON)
-        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
-    if (resp == null ||
-        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
-      String msg = new String();
-      if (resp != null) {
-        msg = resp.getClientResponseStatus().toString();
-      }
-      throw new IOException("Incorrect response from timeline reader. " +
-          "Status=" + msg);
-    }
-    return resp;
-  }
-
-  private static class DummyURLConnectionFactory
-      implements HttpURLConnectionFactory {
-
-    @Override
-    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
-      try {
-        return (HttpURLConnection)url.openConnection();
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      }
-    }
-  }
-
-  private static TimelineMetric newMetric(String id, long ts, Number value) {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setId(id);
-    metric.addValue(ts, value);
-    return metric;
-  }
-
-  private static boolean verifyMetricValues(Map<Long, Number> m1,
-      Map<Long, Number> m2) {
-    for (Map.Entry<Long, Number> entry : m1.entrySet()) {
-      if (!m2.containsKey(entry.getKey())) {
-        return false;
-      }
-      if (m2.get(entry.getKey()).equals(entry.getValue())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static boolean verifyMetrics(
-      TimelineMetric m, TimelineMetric... metrics) {
-    for (TimelineMetric metric : metrics) {
-      if (!metric.equals(m)) {
-        continue;
-      }
-      if (!verifyMetricValues(metric.getValues(), m.getValues())) {
-        continue;
-      }
-      return true;
-    }
-    return false;
-  }
-
-  private static void verifyHttpResponse(Client client, URI uri,
-      Status status) {
-    ClientResponse resp =
-        client.resource(uri).accept(MediaType.APPLICATION_JSON)
-        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
-    assertNotNull(resp);
-    assertTrue("Response from server should have been " + status,
-        resp.getClientResponseStatus().equals(status));
-  }
-
-  @Test
-  public void testGetFlowRun() throws Exception {
-    Client client = createClient();
-    try {
-      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
-      ClientResponse resp = getResponse(client, uri);
-      FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
-      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
-      assertNotNull(entity);
-      assertEquals("user1@flow_name/1002345678919", entity.getId());
-      assertEquals(2, entity.getMetrics().size());
-      TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
-      TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
-      for (TimelineMetric metric : entity.getMetrics()) {
-        assertTrue(verifyMetrics(metric, m1, m2));
-      }
-
-      // Query without specifying cluster ID.
-      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flowrun/flow_name/1002345678919?userid=user1");
-      resp = getResponse(client, uri);
-      entity = resp.getEntity(FlowRunEntity.class);
-      assertNotNull(entity);
-      assertEquals("user1@flow_name/1002345678919", entity.getId());
-      assertEquals(2, entity.getMetrics().size());
-      m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
-      m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
-      for (TimelineMetric metric : entity.getMetrics()) {
-        assertTrue(verifyMetrics(metric, m1, m2));
-      }
-    } finally {
-      client.destroy();
-    }
-  }
-
-  @Test
-  public void testGetFlows() throws Exception {
-    Client client = createClient();
-    try {
-      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flows/cluster1");
-      ClientResponse resp = getResponse(client, uri);
-      Set<FlowActivityEntity> entities =
-          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
-      for (FlowActivityEntity entity : entities) {
-        assertTrue((entity.getId().endsWith("@flow_name") &&
-            entity.getFlowRuns().size() == 2) ||
-            (entity.getId().endsWith("@flow_name2") &&
-            entity.getFlowRuns().size() == 1));
-      }
-
-      // Query without specifying cluster ID.
-      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flows/");
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
-
-      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-              "timeline/flows/cluster1?limit=1");
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(1, entities.size());
-    } finally {
-      client.destroy();
-    }
-  }
-
-  @Test
-  public void testGetFlowRunNotPresent() throws Exception {
-    Client client = createClient();
-    try {
-      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
-      verifyHttpResponse(client, uri, Status.NOT_FOUND);
-    } finally {
-      client.destroy();
-    }
-  }
-
-  @Test
-  public void testGetFlowsNotPresent() throws Exception {
-    Client client = createClient();
-    try {
-      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/flows/cluster2");
-      ClientResponse resp = getResponse(client, uri);
-      Set<FlowActivityEntity> entities =
-          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
-      assertNotNull(entities);
-      assertEquals(0, entities.size());
-    } finally {
-      client.destroy();
-    }
-  }
-
-  @After
-  public void stop() throws Exception {
-    if (server != null) {
-      server.stop();
-      server = null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
new file mode 100644
index 0000000..a89d2fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+public class TestTimelineReaderWebServicesHBaseStorage {
+  private int serverPort;
+  private TimelineReaderServer server;
+  private static HBaseTestingUtility util;
+  private static long ts = System.currentTimeMillis();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+    loadData();
+  }
+
+  private static void loadData() throws Exception {
+    String cluster = "cluster1";
+    String user = "user1";
+    String flow = "flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    Long runid1 = 1002345678920L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "application_1111111111_1111";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+    entity.addConfig("cfg2", "value1");
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 2);
+    metricValues.put(ts - 80000, 40);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    m1 = new TimelineMetric();
+    m1.setId("HDFS_BYTES_READ");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 31);
+    metricValues.put(ts - 80000, 57);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    TimelineEvent event11 = new TimelineEvent();
+    event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    expTs = 1436512802010L;
+    event11.setTimestamp(expTs);
+    entity.addEvent(event11);
+
+    te.addEntity(entity);
+
+    // write another application with same metric to this flow
+    TimelineEntities te1 = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    id = "application_1111111111_2222";
+    type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity1.setId(id);
+    entity1.setType(type);
+    cTime = 1425016501000L;
+    entity1.setCreatedTime(cTime);
+    entity1.addConfig("cfg1", "value1");
+    // add metrics
+    metrics.clear();
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId("MAP_SLOT_MILLIS");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 5L);
+    metricValues.put(ts - 80000, 101L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+    entity1.addMetrics(metrics);
+    TimelineEvent event1 = new TimelineEvent();
+    event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event1.setTimestamp(expTs);
+    event1.addInfo(expKey, expVal);
+    entity1.addEvent(event1);
+    te1.addEntity(entity1);
+
+    String flow2 = "flow_name2";
+    String flowVersion2 = "CF7022C10F1454";
+    Long runid2 = 2102356789046L;
+    TimelineEntities te3 = new TimelineEntities();
+    TimelineEntity entity3 = new TimelineEntity();
+    id = "application_11111111111111_2223";
+    entity3.setId(id);
+    entity3.setType(type);
+    cTime = 1425016501030L;
+    entity3.setCreatedTime(cTime);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event2.setTimestamp(1436512802030L);
+    event2.addInfo("foo_event", "test");
+    entity3.addEvent(event2);
+    te3.addEntity(entity3);
+
+    TimelineEntities te4 = new TimelineEntities();
+    TimelineEntity entity4 = new TimelineEntity();
+    id = "application_1111111111_2224";
+    entity4.setId(id);
+    entity4.setType(type);
+    cTime = 1425016501034L;
+    entity4.setCreatedTime(cTime);
+    TimelineEvent event4 = new TimelineEvent();
+    event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event4.setTimestamp(1436512802037L);
+    event4.addInfo("foo_event", "test");
+    entity4.addEvent(event4);
+    te4.addEntity(entity4);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
+      hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
+      hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
+      hbi.write(cluster, user, flow2,
+          flowVersion2, runid2, entity3.getId(), te3);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Before
+  public void init() throws Exception {
+    try {
+      Configuration config = util.getConfiguration();
+      config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+          "localhost:0");
+      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+      config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+          "org.apache.hadoop.yarn.server.timelineservice.storage." +
+              "HBaseTimelineReaderImpl");
+      config.setInt("hfile.format.version", 3);
+      server = new TimelineReaderServer();
+      server.init(config);
+      server.start();
+      serverPort = server.getWebServerPort();
+    } catch (Exception e) {
+      Assert.fail("Web server failed to start");
+    }
+  }
+
+  private static Client createClient() {
+    ClientConfig cfg = new DefaultClientConfig();
+    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    return new Client(new URLConnectionClientHandler(
+        new DummyURLConnectionFactory()), cfg);
+  }
+
+  private static ClientResponse getResponse(Client client, URI uri)
+      throws Exception {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg = new String();
+      if (resp != null) {
+        msg = resp.getClientResponseStatus().toString();
+      }
+      throw new IOException("Incorrect response from timeline reader. " +
+          "Status=" + msg);
+    }
+    return resp;
+  }
+
+  private static class DummyURLConnectionFactory
+      implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+      try {
+        return (HttpURLConnection)url.openConnection();
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      }
+    }
+  }
+
+  private static TimelineEntity newEntity(String type, String id) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setIdentifier(new TimelineEntity.Identifier(type, id));
+    return entity;
+  }
+
+  private static TimelineMetric newMetric(TimelineMetric.Type type,
+      String id, long ts, Number value) {
+    TimelineMetric metric = new TimelineMetric(type);
+    metric.setId(id);
+    metric.addValue(ts, value);
+    return metric;
+  }
+
+  private static boolean verifyMetricValues(Map<Long, Number> m1,
+      Map<Long, Number> m2) {
+    for (Map.Entry<Long, Number> entry : m1.entrySet()) {
+      if (!m2.containsKey(entry.getKey())) {
+        return false;
+      }
+      if (m2.get(entry.getKey()).equals(entry.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean verifyMetrics(
+      TimelineMetric m, TimelineMetric... metrics) {
+    for (TimelineMetric metric : metrics) {
+      if (!metric.equals(m)) {
+        continue;
+      }
+      if (!verifyMetricValues(metric.getValues(), m.getValues())) {
+        continue;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private static void verifyHttpResponse(Client client, URI uri,
+      Status status) {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertNotNull(resp);
+    assertTrue("Response from server should have been " + status,
+        resp.getClientResponseStatus().equals(status));
+  }
+
+  @Test
+  public void testGetFlowRun() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
+      ClientResponse resp = getResponse(client, uri);
+      FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "HDFS_BYTES_READ", ts - 80000, 57L);
+      TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/flow_name/1002345678919?userid=user1");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(FlowRunEntity.class);
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "HDFS_BYTES_READ", ts - 80000, 57L);
+      m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlows() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/cluster1");
+      ClientResponse resp = getResponse(client, uri);
+      Set<FlowActivityEntity> entities =
+          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowActivityEntity entity : entities) {
+        assertTrue((entity.getId().endsWith("@flow_name") &&
+            entity.getFlowRuns().size() == 2) ||
+            (entity.getId().endsWith("@flow_name2") &&
+            entity.getFlowRuns().size() == 1));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+              "timeline/flows/cluster1?limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetApp() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/app/cluster1/application_1111111111_1111?" +
+          "userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertNotNull(entity);
+      assertEquals("application_1111111111_1111", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+          "HDFS_BYTES_READ", ts - 100000, 31L);
+      m1.addValue(ts - 80000, 57L);
+      TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+          "MAP_SLOT_MILLIS", ts - 100000, 2L);
+      m2.addValue(ts - 80000, 40L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+              "timeline/app/application_1111111111_2222?userid=user1" +
+              "&fields=metrics&flowid=flow_name&flowrunid=1002345678919");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(TimelineEntity.class);
+      assertNotNull(entity);
+      assertEquals("application_1111111111_2222", entity.getId());
+      assertEquals(1, entity.getMetrics().size());
+      TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
+         "MAP_SLOT_MILLIS", ts - 100000, 5L);
+      m2.addValue(ts - 80000, 101L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m3));
+      }
+    } finally {
+        client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetAppWithoutFlowInfo() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/app/cluster1/application_1111111111_1111?" +
+          "userid=user1&fields=ALL");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertNotNull(entity);
+      assertEquals("application_1111111111_1111", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+          "HDFS_BYTES_READ", ts - 100000, 31L);
+      m1.addValue(ts - 80000, 57L);
+      TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+          "MAP_SLOT_MILLIS", ts - 100000, 2L);
+      m2.addValue(ts - 80000, 40L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowRunApps() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrunapps/cluster1/flow_name/1002345678919?" +
+          "userid=user1&fields=ALL");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (TimelineEntity entity : entities) {
+        assertTrue("Unexpected app in result",
+            (entity.getId().equals("application_1111111111_1111") &&
+            entity.getMetrics().size() == 2) ||
+            (entity.getId().equals("application_1111111111_2222") &&
+            entity.getMetrics().size() == 1));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrunapps/flow_name/1002345678919?userid=user1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowApps() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(3, entities.size());
+      for (TimelineEntity entity : entities) {
+        assertTrue("Unexpected app in result",
+            (entity.getId().equals("application_1111111111_1111") &&
+            entity.getMetrics().size() == 2) ||
+            (entity.getId().equals("application_1111111111_2222") &&
+            entity.getMetrics().size() == 1) ||
+            (entity.getId().equals("application_1111111111_2224") &&
+            entity.getMetrics().size() == 0));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/flow_name?userid=user1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(3, entities.size());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/flow_name?userid=user1&limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowAppsFilters() throws Exception {
+    Client client = createClient();
+    try {
+      String entityType = TimelineEntityType.YARN_APPLICATION.toString();
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" +
+          ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Unexpected app in result", entities.contains(
+          newEntity(entityType, "application_1111111111_1111")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" +
+          "HDFS_BYTES_READ");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Unexpected app in result", entities.contains(
+          newEntity(entityType, "application_1111111111_1111")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" +
+          "cfg1:value1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Unexpected app in result", entities.contains(
+          newEntity(entityType, "application_1111111111_2222")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowRunNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
+      verifyHttpResponse(client, uri, Status.NOT_FOUND);
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowsNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/cluster2");
+      ClientResponse resp = getResponse(client, uri);
+      Set<FlowActivityEntity> entities =
+          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(0, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetAppNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/app/cluster1/flow_name/1002345678919/" +
+          "application_1111111111_1378?userid=user1");
+      verifyHttpResponse(client, uri, Status.NOT_FOUND);
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowRunAppsNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrunapps/cluster2/flow_name/1002345678919");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(0, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlowAppsNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowapps/cluster2/flow_name55");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(0, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @After
+  public void stop() throws Exception {
+    if (server != null) {
+      server.stop();
+      server = null;
+    }
+  }
+}