You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:39 UTC

[22/50] [abbrv] hadoop git commit: YARN-4075 [reader REST API] implement support for querying for flows and flow runs (Varun Saxena via vrushali)

YARN-4075 [reader REST API] implement support for querying for flows and flow runs (Varun Saxena via vrushali)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4552598d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4552598d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4552598d

Branch: refs/heads/feature-YARN-2928
Commit: 4552598da618d536dfd375e6bb77974e21287666
Parents: 25f58b4
Author: Vrushali Channapattan <vr...@apache.org>
Authored: Fri Sep 25 12:16:38 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:58 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../records/timelineservice/TimelineMetric.java |  10 +
 .../reader/TimelineReaderManager.java           |  25 +-
 .../reader/TimelineReaderServer.java            |   1 +
 .../reader/TimelineReaderWebServices.java       | 243 ++++++++++--
 .../timelineservice/reader/package-info.java    |  23 ++
 .../storage/TimelineSchemaCreator.java          |   3 +
 .../reader/TestTimelineReaderWebServices.java   |  52 ++-
 .../TestTimelineReaderWebServicesFlowRun.java   | 365 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  19 +
 10 files changed, 689 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 004fec4..4846c5c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -109,6 +109,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4074. [timeline reader] implement support for querying for flows
     and flow runs (sjlee via vrushali)
 
+    YARN-4075. [reader REST API] implement support for querying for flows
+    and flow runs (Varun Saxena via vrushali)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
index f44c1a2..e3870da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
@@ -154,4 +154,14 @@ public class TimelineMetric {
     }
     return true;
   }
+
+  @Override
+  public String toString() {
+    String str = "{id:" + id + ", type:" + type;
+    if (!values.isEmpty()) {
+      str += ", values:" + values;
+    }
+    str += "}";
+    return str;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
index 7fafd82..27a50d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
@@ -25,9 +25,10 @@ import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 
@@ -43,6 +44,22 @@ public class TimelineReaderManager extends AbstractService {
   }
 
   /**
+   * Gets cluster ID from config yarn.resourcemanager.cluster-id
+   * if not supplied by client.
+   * @param clusterId
+   * @param conf
+   * @return clusterId
+   */
+  private static String getClusterID(String clusterId, Configuration conf) {
+    if (clusterId == null || clusterId.isEmpty()) {
+      return conf.get(
+          YarnConfiguration.RM_CLUSTER_ID,
+              YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+    }
+    return clusterId;
+  }
+
+  /**
    * Get a set of entities matching given predicates. The meaning of each
    * argument has been documented with {@link TimelineReader#getEntities}.
    *
@@ -56,7 +73,8 @@ public class TimelineReaderManager extends AbstractService {
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String>  metricFilters, Set<String> eventFilters,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
-    return reader.getEntities(userId, clusterId, flowId, flowRunId, appId,
+    String cluster = getClusterID(clusterId, getConfig());
+    return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
         entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
         modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
         metricFilters, eventFilters, fieldsToRetrieve);
@@ -71,7 +89,8 @@ public class TimelineReaderManager extends AbstractService {
   public TimelineEntity getEntity(String userId, String clusterId,
       String flowId, Long flowRunId, String appId, String entityType,
       String entityId, EnumSet<Field> fields) throws IOException {
-    return reader.getEntity(userId, clusterId, flowId, flowRunId, appId,
+    String cluster = getClusterID(clusterId, getConfig());
+    return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
         entityType, entityId, fields);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
index 319cfb0..afe1536 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
@@ -77,6 +77,7 @@ public class TimelineReaderServer extends CompositeService {
     TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
         YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
         FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
+    LOG.info("Using store " + readerStore.getClass().getName());
     readerStore.init(conf);
     return readerStore;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index 0b5fde0..f619c7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -169,6 +171,11 @@ public class TimelineReaderWebServices {
     return str == null ? null : str.trim();
   }
 
+  private static String parseUser(UserGroupInformation callerUGI, String user) {
+    return (callerUGI != null && (user == null || user.isEmpty()) ?
+        callerUGI.getUserName().trim() : parseStr(user));
+  }
+
   private static UserGroupInformation getUser(HttpServletRequest req) {
     String remoteUser = req.getRemoteUser();
     UserGroupInformation callerUGI = null;
@@ -183,6 +190,17 @@ public class TimelineReaderWebServices {
         ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR);
   }
 
+  private static void handleException(Exception e) throws BadRequestException,
+      WebApplicationException {
+    if (e instanceof IllegalArgumentException) {
+      throw new BadRequestException("Requested Invalid Field.");
+    } else {
+      LOG.error("Error while processing REST request", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   /**
    * Return the description of the timeline reader web services.
    */
@@ -196,25 +214,58 @@ public class TimelineReaderWebServices {
   }
 
   /**
+   * Return a set of entities that match the given parameters. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/entities/{appid}/{entitytype}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+       @QueryParam("fields") String fields) {
+    return getEntities(req, res, null, appId, entityType, userId, flowId,
+        flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
+
+  /**
    * Return a set of entities that match the given parameters.
    */
   @GET
-  @Path("/entities/{clusterId}/{appId}/{entityType}")
+  @Path("/entities/{clusterid}/{appid}/{entitytype}/")
   @Produces(MediaType.APPLICATION_JSON)
   public Set<TimelineEntity> getEntities(
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
-      @PathParam("clusterId") String clusterId,
-      @PathParam("appId") String appId,
-      @PathParam("entityType") String entityType,
-      @QueryParam("userId") String userId,
-      @QueryParam("flowId") String flowId,
-      @QueryParam("flowRunId") String flowRunId,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
       @QueryParam("limit") String limit,
-      @QueryParam("createdTimeStart") String createdTimeStart,
-      @QueryParam("createdTimeEnd") String createdTimeEnd,
-      @QueryParam("modifiedTimeStart") String modifiedTimeStart,
-      @QueryParam("modifiedTimeEnd") String modifiedTimeEnd,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
       @QueryParam("relatesto") String relatesTo,
       @QueryParam("isrelatedto") String isRelatedTo,
       @QueryParam("infofilters") String infofilters,
@@ -225,11 +276,10 @@ public class TimelineReaderWebServices {
     init(res);
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
     UserGroupInformation callerUGI = getUser(req);
+    Set<TimelineEntity> entities = null;
     try {
-      return timelineReaderManager.getEntities(
-          callerUGI != null && (userId == null || userId.isEmpty()) ?
-          callerUGI.getUserName().trim() : parseStr(userId),
-          parseStr(clusterId), parseStr(flowId),
+      entities = timelineReaderManager.getEntities(
+          parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
           parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
           parseLongStr(limit), parseLongStr(createdTimeStart),
           parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
@@ -245,31 +295,52 @@ public class TimelineReaderWebServices {
       throw new BadRequestException(
           "createdTime or modifiedTime start/end or limit or flowId is not" +
           " a numeric value.");
-    } catch (IllegalArgumentException e) {
-      throw new BadRequestException("Requested Invalid Field.");
     } catch (Exception e) {
-      LOG.error("Error getting entities", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
+      handleException(e);
+    }
+    if (entities == null) {
+      entities = Collections.emptySet();
     }
+    return entities;
+  }
+
+  /**
+   * Return a single entity of the given entity type and Id. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/entity/{appid}/{entitytype}/{entityid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getEntity(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @PathParam("entityid") String entityId,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("fields") String fields) {
+    return getEntity(req, res, null, appId, entityType, entityId, userId,
+        flowId, flowRunId, fields);
   }
 
   /**
    * Return a single entity of the given entity type and Id.
    */
   @GET
-  @Path("/entity/{clusterId}/{appId}/{entityType}/{entityId}/")
+  @Path("/entity/{clusterid}/{appid}/{entitytype}/{entityid}/")
   @Produces(MediaType.APPLICATION_JSON)
   public TimelineEntity getEntity(
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
-      @PathParam("clusterId") String clusterId,
-      @PathParam("appId") String appId,
-      @PathParam("entityType") String entityType,
-      @PathParam("entityId") String entityId,
-      @QueryParam("userId") String userId,
-      @QueryParam("flowId") String flowId,
-      @QueryParam("flowRunId") String flowRunId,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @PathParam("entityid") String entityId,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
       @QueryParam("fields") String fields) {
     init(res);
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
@@ -277,19 +348,13 @@ public class TimelineReaderWebServices {
     TimelineEntity entity = null;
     try {
       entity = timelineReaderManager.getEntity(
-          callerUGI != null && (userId == null || userId.isEmpty()) ?
-          callerUGI.getUserName().trim() : parseStr(userId),
-          parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
-          parseStr(appId), parseStr(entityType), parseStr(entityId),
-          parseFieldsStr(fields, COMMA_DELIMITER));
+          parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
+          parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
+          parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
     } catch (NumberFormatException e) {
-      throw new BadRequestException("flowRunId is not a numeric value.");
-    } catch (IllegalArgumentException e) {
-      throw new BadRequestException("Requested Invalid Field.");
+      throw new BadRequestException("flowrunid is not a numeric value.");
     } catch (Exception e) {
-      LOG.error("Error getting entity", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
+      handleException(e);
     }
     if (entity == null) {
       throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) +
@@ -297,4 +362,104 @@ public class TimelineReaderWebServices {
     }
     return entity;
   }
+
+  /**
+   * Return a single flow run for the given cluster, flow id and run id.
+   * Cluster ID is not provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/flowrun/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getFlowRun(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    return getFlowRun(req, res, null, flowId, flowRunId, userId, fields);
+  }
+
+  /**
+   * Return a single flow run for the given cluster, flow id and run id.
+   */
+  @GET
+  @Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getFlowRun(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    UserGroupInformation callerUGI = getUser(req);
+    TimelineEntity entity = null;
+    try {
+      entity = timelineReaderManager.getEntity(
+          parseUser(callerUGI, userId), parseStr(clusterId),
+          parseStr(flowId), parseLongStr(flowRunId), null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null,
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException("flowRunId is not a numeric value.");
+    } catch (Exception e) {
+      handleException(e);
+    }
+    if (entity == null) {
+      throw new NotFoundException("Flow run {flow id: " + parseStr(flowId) +
+          ", run id: " + parseLongStr(flowRunId) + " } is not found");
+    }
+    return entity;
+  }
+
+  /**
+   * Return a list of flows for a given cluster id. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/flows/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlows(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    return getFlows(req, res, null, limit, fields);
+  }
+
+  /**
+   * Return a list of flows for a given cluster id.
+   */
+  @GET
+  @Path("/flows/{clusterid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlows(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    Set<TimelineEntity> entities = null;
+    try {
+      entities = timelineReaderManager.getEntities(
+          null, parseStr(clusterId), null, null, null,
+          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit),
+          null, null, null, null, null, null, null, null, null, null,
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException("limit is not a numeric value.");
+    } catch (Exception e) {
+      handleException(e);
+    }
+    if (entities == null) {
+      entities = Collections.emptySet();
+    }
+    return entities;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
new file mode 100644
index 0000000..51247bd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index cbcff4c..46bc2e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This creates the schema for a hbase based backend for storing application
  * timeline information.
@@ -201,6 +203,7 @@ public class TimelineSchemaCreator {
     return commandLine;
   }
 
+  @VisibleForTesting
   public static void createAllTables(Configuration hbaseConf,
       boolean skipExisting) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index 0f7c22f..45bce2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -187,8 +187,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entity/cluster1/app1/app/id_1?userId=user1&" +
-          "flowId=flow1&flowRunId=1");
+          "timeline/entity/cluster1/app1/app/id_1?userid=user1&" +
+          "flowid=flow1&flowrunid=1");
       ClientResponse resp = getResponse(client, uri);
       TimelineEntity entity = resp.getEntity(TimelineEntity.class);
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -259,6 +259,32 @@ public class TestTimelineReaderWebServices {
   }
 
   @Test
+  public void testQueryWithoutCluster() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/app1/app/id_1");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/app1/app");
+      resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(4, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
   public void testGetEntities() throws Exception {
     Client client = createClient();
     try {
@@ -318,8 +344,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?createdTimeStart=1425016502030&"
-          + "createdTimeEnd=1425016502060");
+          "timeline/entities/cluster1/app1/app?createdtimestart=1425016502030&"
+          + "createdtimeend=1425016502060");
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -330,7 +356,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?createdTimeEnd=1425016502010");
+          "entities/cluster1/app1/app?createdtimeend=1425016502010");
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -340,7 +366,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?createdTimeStart=1425016502010");
+          "entities/cluster1/app1/app?createdtimestart=1425016502010");
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -358,8 +384,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?modifiedTimeStart=1425016502090"
-          + "&modifiedTimeEnd=1425016503020");
+          "timeline/entities/cluster1/app1/app?modifiedtimestart=1425016502090"
+          + "&modifiedtimeend=1425016503020");
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -372,7 +398,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?modifiedTimeEnd=1425016502090");
+          "entities/cluster1/app1/app?modifiedtimeend=1425016502090");
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -384,7 +410,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_3")));
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?modifiedTimeStart=1425016503005");
+          "entities/cluster1/app1/app?modifiedtimestart=1425016503005");
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -527,7 +553,7 @@ public class TestTimelineReaderWebServices {
           "timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
           "isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
           "flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
-          "&createdTimeStart=1425016502030&createdTimeEnd=1425016502060");
+          "&createdtimestart=1425016502030&createdtimeend=1425016502060");
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -544,11 +570,11 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?flowRunId=a23b");
+          "timeline/entities/cluster1/app1/app?flowrunid=a23b");
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entity/cluster1/app1/app/id_1?flowRunId=2ab15");
+          "entity/cluster1/app1/app/id_1?flowrunid=2ab15");
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
new file mode 100644
index 0000000..ae71e2c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+public class TestTimelineReaderWebServicesFlowRun {
+  private int serverPort;
+  private TimelineReaderServer server;
+  private static HBaseTestingUtility util;
+  private static long ts = System.currentTimeMillis();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+    loadData();
+  }
+
+  private static void loadData() throws Exception {
+    String cluster = "cluster1";
+    String user = "user1";
+    String flow = "flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    Long runid1 = 1002345678920L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 2);
+    metricValues.put(ts - 80000, 40);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    m1 = new TimelineMetric();
+    m1.setId("HDFS_BYTES_READ");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 31);
+    metricValues.put(ts - 80000, 57);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    te.addEntity(entity);
+
+    // write another application with same metric to this flow
+    TimelineEntities te1 = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    id = "flowRunMetrics_test";
+    type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity1.setId(id);
+    entity1.setType(type);
+    cTime = 1425016501000L;
+    entity1.setCreatedTime(cTime);
+    // add metrics
+    metrics.clear();
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId("MAP_SLOT_MILLIS");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 5L);
+    metricValues.put(ts - 80000, 101L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+    entity1.addMetrics(metrics);
+    te1.addEntity(entity1);
+
+    String flow2 = "flow_name2";
+    String flowVersion2 = "CF7022C10F1454";
+    Long runid2 = 2102356789046L;
+    TimelineEntities te3 = new TimelineEntities();
+    TimelineEntity entity3 = new TimelineEntity();
+    id = "flowRunMetrics_test1";
+    entity3.setId(id);
+    entity3.setType(type);
+    cTime = 1425016501030L;
+    entity3.setCreatedTime(cTime);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event2.setTimestamp(1436512802030L);
+    event2.addInfo("foo_event", "test");
+    entity3.addEvent(event2);
+    te3.addEntity(entity3);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
+      appName = "application_11111111111111_2223";
+      hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Before
+  public void init() throws Exception {
+    try {
+      Configuration config = util.getConfiguration();
+      config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+          "localhost:0");
+      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+      config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+          "org.apache.hadoop.yarn.server.timelineservice.storage." +
+              "HBaseTimelineReaderImpl");
+      config.setInt("hfile.format.version", 3);
+      server = new TimelineReaderServer();
+      server.init(config);
+      server.start();
+      serverPort = server.getWebServerPort();
+    } catch (Exception e) {
+      Assert.fail("Web server failed to start");
+    }
+  }
+
+  private static Client createClient() {
+    ClientConfig cfg = new DefaultClientConfig();
+    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    return new Client(new URLConnectionClientHandler(
+        new DummyURLConnectionFactory()), cfg);
+  }
+
+  private static ClientResponse getResponse(Client client, URI uri)
+      throws Exception {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg = new String();
+      if (resp != null) {
+        msg = resp.getClientResponseStatus().toString();
+      }
+      throw new IOException("Incorrect response from timeline reader. " +
+          "Status=" + msg);
+    }
+    return resp;
+  }
+
+  private static class DummyURLConnectionFactory
+      implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+      try {
+        return (HttpURLConnection)url.openConnection();
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      }
+    }
+  }
+
+  private static TimelineMetric newMetric(String id, long ts, Number value) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId(id);
+    metric.addValue(ts, value);
+    return metric;
+  }
+
+  private static boolean verifyMetricValues(Map<Long, Number> m1,
+      Map<Long, Number> m2) {
+    for (Map.Entry<Long, Number> entry : m1.entrySet()) {
+      if (!m2.containsKey(entry.getKey())) {
+        return false;
+      }
+      if (m2.get(entry.getKey()).equals(entry.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean verifyMetrics(
+      TimelineMetric m, TimelineMetric... metrics) {
+    for (TimelineMetric metric : metrics) {
+      if (!metric.equals(m)) {
+        continue;
+      }
+      if (!verifyMetricValues(metric.getValues(), m.getValues())) {
+        continue;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Test
+  public void testGetFlowRun() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
+      ClientResponse resp = getResponse(client, uri);
+      FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
+      TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/flow_name/1002345678919?userid=user1");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(FlowRunEntity.class);
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
+      m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlows() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/cluster1");
+      ClientResponse resp = getResponse(client, uri);
+      Set<FlowActivityEntity> entities =
+          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowActivityEntity entity : entities) {
+        assertTrue((entity.getId().endsWith("@flow_name") &&
+            entity.getFlowRuns().size() == 2) ||
+            (entity.getId().endsWith("@flow_name2") &&
+            entity.getFlowRuns().size() == 1));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+              "timeline/flows/cluster1?limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @After
+  public void stop() throws Exception {
+    if (server != null) {
+      server.stop();
+      server = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4552598d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n