You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2015/09/22 22:47:31 UTC

[1/2] hadoop git commit: YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 4b37985e6 -> 2e7e0f0bb


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index c514c20..889ae19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
 
 public class HBaseTimelineReaderImpl
     extends AbstractService implements TimelineReader {
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineReaderImpl.class);
-  private static final long DEFAULT_BEGIN_TIME = 0L;
-  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
 
   private Configuration hbaseConf = null;
   private Connection conn;
-  private EntityTable entityTable;
-  private AppToFlowTable appToFlowTable;
-  private ApplicationTable applicationTable;
 
   public HBaseTimelineReaderImpl() {
     super(HBaseTimelineReaderImpl.class.getName());
@@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl
     super.serviceInit(conf);
     hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
-    entityTable = new EntityTable();
-    appToFlowTable = new AppToFlowTable();
-    applicationTable = new ApplicationTable();
   }
 
   @Override
@@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl
       String flowId, Long flowRunId, String appId, String entityType,
       String entityId, EnumSet<Field> fieldsToRetrieve)
       throws IOException {
-    validateParams(userId, clusterId, appId, entityType, entityId, true);
-    // In reality both should be null or neither should be null
-    if (flowId == null || flowRunId == null) {
-      FlowContext context = lookupFlowContext(clusterId, appId);
-      flowId = context.flowId;
-      flowRunId = context.flowRunId;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-
-    boolean isApplication = isApplicationEntity(entityType);
-    byte[] rowKey = isApplication ?
-        ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
-            appId) :
-        EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
-            entityType, entityId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    Result result = isApplication ?
-        applicationTable.getResult(hbaseConf, conn, get) :
-        entityTable.getResult(hbaseConf, conn, get);
-    return parseEntity(result, fieldsToRetrieve,
-        false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
-        DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
-  }
-
-  private static boolean isApplicationEntity(String entityType) {
-    return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
+            flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
+    return reader.readEntity(hbaseConf, conn);
   }
 
   @Override
@@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String> metricFilters, Set<String> eventFilters,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
-    validateParams(userId, clusterId, appId, entityType, null, false);
-    // In reality both should be null or neither should be null
-    if (flowId == null || flowRunId == null) {
-      FlowContext context = lookupFlowContext(clusterId, appId);
-      flowId = context.flowId;
-      flowRunId = context.flowRunId;
-    }
-    if (limit == null) {
-      limit = TimelineReader.DEFAULT_LIMIT;
-    }
-    if (createdTimeBegin == null) {
-      createdTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (createdTimeEnd == null) {
-      createdTimeEnd = DEFAULT_END_TIME;
-    }
-    if (modifiedTimeBegin == null) {
-      modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (modifiedTimeEnd == null) {
-      modifiedTimeEnd = DEFAULT_END_TIME;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-
-    NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    boolean isApplication = isApplicationEntity(entityType);
-    if (isApplication) {
-      // If getEntities() is called for an application, there can be at most
-      // one entity. If the entity passes the filter, it is returned. Otherwise,
-      // an empty set is returned.
-      byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
-          flowRunId, appId);
-      Get get = new Get(rowKey);
-      get.setMaxVersions(Integer.MAX_VALUE);
-      Result result = applicationTable.getResult(hbaseConf, conn, get);
-      TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
-          true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
-          modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
-          eventFilters, metricFilters, isApplication);
-      if (entity != null) {
-        entities.add(entity);
-      }
-    } else {
-      // Scan through part of the table to find the entities belong to one app
-      // and one type
-      Scan scan = new Scan();
-      scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-          clusterId, userId, flowId, flowRunId, appId, entityType));
-      scan.setMaxVersions(Integer.MAX_VALUE);
-      ResultScanner scanner =
-          entityTable.getResultScanner(hbaseConf, conn, scan);
-      for (Result result : scanner) {
-        TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
-            true, createdTimeBegin, createdTimeEnd,
-            true, modifiedTimeBegin, modifiedTimeEnd,
-            isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
-            metricFilters, isApplication);
-        if (entity == null) {
-          continue;
-        }
-        if (entities.size() > limit) {
-          entities.pollLast();
-        }
-        entities.add(entity);
-      }
-    }
-    return entities;
-  }
-
-  private FlowContext lookupFlowContext(String clusterId, String appId)
-      throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
-    Get get = new Get(rowKey);
-    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
-    if (result != null && !result.isEmpty()) {
-      return new FlowContext(
-          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
-    } else {
-       throw new IOException(
-           "Unable to find the context flow ID and flow run ID for clusterId=" +
-           clusterId + ", appId=" + appId);
-    }
-  }
-
-  private static class FlowContext {
-    private String flowId;
-    private Long flowRunId;
-    public FlowContext(String flowId, Long flowRunId) {
-      this.flowId = flowId;
-      this.flowRunId = flowRunId;
-    }
-  }
-
-  private static void validateParams(String userId, String clusterId,
-      String appId, String entityType, String entityId, boolean checkEntityId) {
-    Preconditions.checkNotNull(userId, "userId shouldn't be null");
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (checkEntityId) {
-      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
-    }
-  }
-
-  private static TimelineEntity parseEntity(
-      Result result, EnumSet<Field> fieldsToRetrieve,
-      boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
-      boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
-      Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> eventFilters, Set<String> metricFilters,
-      boolean isApplication)
-          throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    String entityType = isApplication ?
-        TimelineEntityType.YARN_APPLICATION.toString() :
-        EntityColumn.TYPE.readResult(result).toString();
-    entity.setType(entityType);
-    String entityId = isApplication ?
-        ApplicationColumn.ID.readResult(result).toString() :
-        EntityColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime = isApplication ?
-        (Number)ApplicationColumn.CREATED_TIME.readResult(result) :
-        (Number)EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime = isApplication ?
-        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
-        (Number)EntityColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      if (isApplication) {
-        readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-            true);
-      } else {
-        readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
-            true);
-      }
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      if (isApplication) {
-        readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-            false);
-      } else {
-        readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      }
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      if (isApplication) {
-        readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-      } else {
-        readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-      }
-      if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      if (isApplication) {
-        readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      } else {
-        readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      }
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, isApplication);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, isApplication);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-
-  private static <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  private static <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
-   * schema description.
-   */
-  private static void readEvents(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result) :
-        EntityColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-
-  private static void readMetrics(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
-    if (isApplication) {
-      metricsResult =
-          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
-    } else {
-      metricsResult =
-          EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
-    }
-    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
-        metricsResult.entrySet()) {
-      TimelineMetric metric = new TimelineMetric();
-      metric.setId(metricResult.getKey());
-      // Simply assume that if the value set contains more than 1 elements, the
-      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      metric.setType(metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
-      metric.addValues(metricResult.getValue());
-      entity.addMetric(metric);
-    }
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
+            clusterId, flowId, flowRunId, appId, entityType, limit,
+            createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
+            modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
+            metricFilters, eventFilters, fieldsToRetrieve);
+    return reader.readEntities(hbaseConf, conn);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
new file mode 100644
index 0000000..0d1134c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+abstract class TimelineEntityReader {
+  protected final boolean singleEntityRead;
+
+  protected String userId;
+  protected String clusterId;
+  protected String flowId;
+  protected Long flowRunId;
+  protected String appId;
+  protected String entityType;
+  protected EnumSet<Field> fieldsToRetrieve;
+  // used only for a single entity read mode
+  protected String entityId;
+  // used only for multiple entity read mode
+  protected Long limit;
+  protected Long createdTimeBegin;
+  protected Long createdTimeEnd;
+  protected Long modifiedTimeBegin;
+  protected Long modifiedTimeEnd;
+  protected Map<String, Set<String>> relatesTo;
+  protected Map<String, Set<String>> isRelatedTo;
+  protected Map<String, Object> infoFilters;
+  protected Map<String, String> configFilters;
+  protected Set<String> metricFilters;
+  protected Set<String> eventFilters;
+
+  /**
+   * Main table the entity reader uses.
+   */
+  protected BaseTable<?> table;
+
+  /**
+   * Instantiates a reader for multiple-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    this.singleEntityRead = false;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.limit = limit;
+    this.createdTimeBegin = createdTimeBegin;
+    this.createdTimeEnd = createdTimeEnd;
+    this.modifiedTimeBegin = modifiedTimeBegin;
+    this.modifiedTimeEnd = modifiedTimeEnd;
+    this.relatesTo = relatesTo;
+    this.isRelatedTo = isRelatedTo;
+    this.infoFilters = infoFilters;
+    this.configFilters = configFilters;
+    this.metricFilters = metricFilters;
+    this.eventFilters = eventFilters;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Instantiates a reader for single-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    this.singleEntityRead = true;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.entityId = entityId;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Reads and deserializes a single timeline entity from the HBase storage.
+   */
+  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    Result result = getResult(hbaseConf, conn);
+    return parseEntity(result);
+  }
+
+  /**
+   * Reads and deserializes a set of timeline entities from the HBase storage.
+   * It goes through all the results available, and returns the number of
+   * entries as specified in the limit in the entity's natural sort order.
+   */
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    Iterable<Result> results = getResults(hbaseConf, conn);
+    for (Result result : results) {
+      TimelineEntity entity = parseEntity(result);
+      if (entity == null) {
+        continue;
+      }
+      entities.add(entity);
+      if (entities.size() > limit) {
+        entities.pollLast();
+      }
+    }
+    return entities;
+  }
+
+  /**
+   * Returns the main table to be used by the entity reader.
+   */
+  protected abstract BaseTable<?> getTable();
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Sets certain parameters to defaults if the values are not provided.
+   */
+  protected abstract void augmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Fetches a {@link Result} instance for a single-entity read.
+   *
+   * @return the {@link Result} instance or null if no such record is found.
+   */
+  protected abstract Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException;
+
+  /**
+   * Fetches an iterator for {@link Result} instances for a multi-entity read.
+   */
+  protected abstract Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Given a {@link Result} instance, deserializes and creates a
+   * {@link TimelineEntity}.
+   *
+   * @return the {@link TimelineEntity} instance, or null if the {@link Result}
+   * is null or empty.
+   */
+  protected abstract TimelineEntity parseEntity(Result result)
+      throws IOException;
+
+  /**
+   * Helper method for reading and deserializing {@link TimelineMetric} objects
+   * using the specified column prefix. The timeline metrics then are added to
+   * the given timeline entity.
+   */
+  protected void readMetrics(TimelineEntity entity, Result result,
+      ColumnPrefix<?> columnPrefix) throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        columnPrefix.readResultsWithTimestamps(result);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      metric.setType(metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..4fdef40
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+class TimelineEntityReaderFactory {
+  /**
+   * Creates a timeline entity reader instance for reading a single entity with
+   * the specified input.
+   */
+  public static TimelineEntityReader createSingleEntityReader(String userId,
+      String clusterId, String flowId, Long flowRunId, String appId,
+      String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+        appId, entityType, entityId, fieldsToRetrieve);
+    }
+  }
+
+  /**
+   * Creates a timeline entity reader instance for reading set of entities with
+   * the specified input and predicates.
+   */
+  public static TimelineEntityReader createMultipleEntitiesReader(String userId,
+      String clusterId, String flowId, Long flowRunId, String appId,
+      String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index 5f3868b..e3b5a87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
 /**
  * Represents a rowkey for the application table.
  */
 public class ApplicationRowKey {
-  // TODO: more methods are needed for this class.
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+  private final String appId;
 
-  // TODO: API needs to be cleaned up.
+  public ApplicationRowKey(String clusterId, String userId, String flowId,
+      long flowRunId, String appId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
 
   /**
    * Constructs a row key for the application table as follows:
@@ -46,22 +78,32 @@ public class ApplicationRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third = Bytes.toBytes(appId);
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
   /**
-   * Converts a timestamp into its inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted long
+   * Given the raw row key as bytes, returns the row key as an object.
    */
-  public static long invert(Long key) {
-    return Long.MAX_VALUE - key;
-  }
+  public static ApplicationRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 5) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an application");
+    }
 
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    String appId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+    return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index ad4fec6..ca88056 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
  * Represents a rowkey for the app_flow table.
  */
 public class AppToFlowRowKey {
+  private final String clusterId;
+  private final String appId;
+
+  public AppToFlowRowKey(String clusterId, String appId) {
+    this.clusterId = clusterId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
   /**
    * Constructs a row key prefix for the app_flow table as follows:
    * {@code clusterId!AppId}
@@ -36,4 +52,19 @@ public class AppToFlowRowKey {
     return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 2) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "the app-to-flow table");
+    }
+
+    String clusterId = Bytes.toString(rowKeyComponents[0]);
+    String appId = Bytes.toString(rowKeyComponents[1]);
+    return new AppToFlowRowKey(clusterId, appId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index abba79a..9545438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 
 /**
- * Implements behavior common to tables used in the timeline service storage.
+ * Implements behavior common to tables used in the timeline service storage. It
+ * is thread-safe, and can be used by multiple threads concurrently.
  *
  * @param <T> reference to the table instance class itself for type safety.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 9a72be0..6a534ed73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
@@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
  * Represents a rowkey for the entity table.
  */
 public class EntityRowKey {
-  // TODO: more methods are needed for this class.
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+  private final String appId;
+  private final String entityType;
+  private final String entityId;
 
-  // TODO: API needs to be cleaned up.
+  public EntityRowKey(String clusterId, String userId, String flowId,
+      long flowRunId, String appId, String entityType, String entityId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.entityId = entityId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getEntityType() {
+    return entityType;
+  }
+
+  public String getEntityId() {
+    return entityId;
+  }
 
   /**
    * Constructs a row key prefix for the entity table as follows:
@@ -106,4 +148,32 @@ public class EntityRowKey {
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static EntityRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 7) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an entity");
+    }
+
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    String appId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+    String entityType =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
+    String entityId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
+    return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId,
+        entityType, entityId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 19e4e83..18ca599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -55,6 +55,10 @@ public class FlowActivityRowKey {
     return flowId;
   }
 
+  public static byte[] getRowKeyPrefix(String clusterId) {
+    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
+  }
+
   /**
    * Constructs a row key for the flow activity table as follows:
    * {@code clusterId!dayTimestamp!user!flowId}
@@ -65,7 +69,8 @@ public class FlowActivityRowKey {
    * @param flowId
    * @return byte array with the row key prefix
    */
-  public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId) {
     long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
     return getRowKey(clusterId, dayTs, userId, flowId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index e133241..880d481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
  * Represents a rowkey for the flow run table.
  */
 public class FlowRunRowKey {
-  // TODO: more methods are needed for this class like parse row key
+  private final String clusterId;
+  private final String userId;
+  private final String flowId;
+  private final long flowRunId;
+
+  public FlowRunRowKey(String clusterId, String userId, String flowId,
+      long flowRunId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
 
   /**
    * Constructs a row key for the entity table as follows: {
@@ -47,4 +74,25 @@ public class FlowRunRowKey {
     return Separator.QUALIFIERS.join(first, second);
   }
 
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static FlowRunRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 4) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "a flow run");
+    }
+
+    String clusterId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+    String userId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+    String flowId =
+        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+    long flowRunId =
+        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+    return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index a1948aa..651bb3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -18,6 +18,15 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
  * table. Looks through the list of cells per row, checks their tags and does

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 3962341..01920b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage {
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
       String flow, long runid, String appName, TimelineEntity te) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    assertTrue(rowKeyComponents.length == 7);
-    assertEquals(user, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
-    assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
-    assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+    EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
+
+    assertEquals(user, key.getUserId());
+    assertEquals(cluster, key.getClusterId());
+    assertEquals(flow, key.getFlowId());
+    assertEquals(runid, key.getFlowRunId());
+    assertEquals(appName, key.getAppId());
+    assertEquals(te.getType(), key.getEntityType());
+    assertEquals(te.getId(), key.getEntityId());
     return true;
   }
 
   private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
       String user, String flow, long runid, String appName) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+    ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
 
-    assertTrue(rowKeyComponents.length == 5);
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+    assertEquals(cluster, key.getClusterId());
+    assertEquals(user, key.getUserId());
+    assertEquals(flow, key.getFlowId());
+    assertEquals(runid, key.getFlowRunId());
+    assertEquals(appName, key.getAppId());
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index f8331fa..d18613a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -45,7 +45,7 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
 
     // add metrics
@@ -54,8 +54,8 @@ class TestFlowDataGenerator {
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
     long ts = System.currentTimeMillis();
-    metricValues.put(ts - 100000, 2);
-    metricValues.put(ts - 80000, 40);
+    metricValues.put(ts - 100000, 2L);
+    metricValues.put(ts - 80000, 40L);
     m1.setType(Type.TIME_SERIES);
     m1.setValues(metricValues);
     metrics.add(m1);
@@ -64,8 +64,8 @@ class TestFlowDataGenerator {
     m2.setId(metric2);
     metricValues = new HashMap<Long, Number>();
     ts = System.currentTimeMillis();
-    metricValues.put(ts - 100000, 31);
-    metricValues.put(ts - 80000, 57);
+    metricValues.put(ts - 100000, 31L);
+    metricValues.put(ts - 80000, 57L);
     m2.setType(Type.TIME_SERIES);
     m2.setValues(metricValues);
     metrics.add(m2);
@@ -80,7 +80,7 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
     // add metrics
     Set<TimelineMetric> metrics = new HashSet<>();
@@ -103,8 +103,8 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 20000000000000L;
-    Long mTime = 1425026901000L;
+    long cTime = 20000000000000L;
+    long mTime = 1425026901000L;
     entity.setCreatedTime(cTime);
     entity.setModifiedTime(mTime);
     // add metrics
@@ -113,10 +113,10 @@ class TestFlowDataGenerator {
     m1.setId(metric1);
     Map<Long, Number> metricValues = new HashMap<Long, Number>();
     long ts = System.currentTimeMillis();
-    metricValues.put(ts - 120000, 100000000);
-    metricValues.put(ts - 100000, 200000000);
-    metricValues.put(ts - 80000, 300000000);
-    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 120000, 100000000L);
+    metricValues.put(ts - 100000, 200000000L);
+    metricValues.put(ts - 80000, 300000000L);
+    metricValues.put(ts - 60000, 400000000L);
     metricValues.put(ts - 40000, 50000000000L);
     metricValues.put(ts - 20000, 60000000000L);
     m1.setType(Type.TIME_SERIES);
@@ -126,7 +126,7 @@ class TestFlowDataGenerator {
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";
@@ -142,9 +142,9 @@ class TestFlowDataGenerator {
     return entity;
   }
 
-  static TimelineEntity getEntityGreaterStartTime() {
+  static TimelineEntity getEntityGreaterStartTime(long startTs) {
     TimelineEntity entity = new TimelineEntity();
-    entity.setCreatedTime(30000000000000L);
+    entity.setCreatedTime(startTs);
     entity.setId("flowRunHello with greater start time");
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setType(type);
@@ -173,14 +173,13 @@ class TestFlowDataGenerator {
     return entity;
   }
 
-  static TimelineEntity getEntityMinStartTime() {
+  static TimelineEntity getEntityMinStartTime(long startTs) {
     TimelineEntity entity = new TimelineEntity();
     String id = "flowRunHelloMInStartTime";
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 10000000000000L;
-    entity.setCreatedTime(cTime);
+    entity.setCreatedTime(startTs);
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
     event.setTimestamp(System.currentTimeMillis());
@@ -195,12 +194,12 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
+    long cTime = 1425016501000L;
     entity.setCreatedTime(cTime);
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index b4a0c74..6bdec6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity {
     String user = "testWriteFlowRunMinMaxToHBase_user1";
     String flow = "testing_flowRun_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
+    long minStartTs = 10000000000000L;
+    long greaterStartTs = 30000000000000L;
     long endTs = 1439750690000L;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime();
+        .getEntityMinStartTime(minStartTs);
 
     try {
       hbi = new HBaseTimelineWriterImpl(c1);
@@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity {
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime();
+          .getEntityGreaterStartTime(greaterStartTs);
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
@@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     checkFlowActivityRunId(runid, flowVersion, values);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow activity entity
+      Set<TimelineEntity> entities =
+          hbr.getEntities(null, cluster, null, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   /**
@@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity {
     String user = "testWriteFlowActivityOneFlow_user1";
     String flow = "flow_activity_test_flow_name";
     String flowVersion = "A122110F135BC4";
-    Long runid = 1001111178919L;
+    long runid = 1001111178919L;
 
     TimelineEntities te = new TimelineEntities();
     TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
@@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity {
     }
     // check flow activity
     checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+
+    // use the reader to verify the data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities =
+          hbr.getEntities(user, cluster, flow, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity entity = (FlowActivityEntity)e;
+        NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
+        assertEquals(1, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          assertEquals(runid, flowRun.getRunId());
+          assertEquals(flowVersion, flowRun.getVersion());
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowActivityTable(String cluster, String user, String flow,
-      String flowVersion, Long runid, Configuration c1) throws IOException {
+      String flowVersion, long runid, Configuration c1) throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
     byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
@@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity {
     String user = "testManyRunsFlowActivity_c_user1";
     String flow = "flow_activity_test_flow_name";
     String flowVersion1 = "A122110F135BC4";
-    Long runid1 = 11111111111L;
+    long runid1 = 11111111111L;
 
     String flowVersion2 = "A12222222222C4";
     long runid2 = 2222222222222L;
@@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity {
     checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
         runid1, flowVersion2, runid2, flowVersion3, runid3);
 
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+
+      Set<TimelineEntity> entities =
+          hbr.getEntities(null, cluster, null, null, null,
+              TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+              null, null, null, null, null, null, null, null, null);
+      assertEquals(1, entities.size());
+      for (TimelineEntity e : entities) {
+        FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+        assertEquals(cluster, flowActivity.getCluster());
+        assertEquals(user, flowActivity.getUser());
+        assertEquals(flow, flowActivity.getFlowName());
+        long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+            .currentTimeMillis());
+        assertEquals(dayTs, flowActivity.getDate().getTime());
+        Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+        assertEquals(3, flowRuns.size());
+        for (FlowRunEntity flowRun : flowRuns) {
+          long runId = flowRun.getRunId();
+          String version = flowRun.getVersion();
+          if (runId == runid1) {
+            assertEquals(flowVersion1, version);
+          } else if (runId == runid2) {
+            assertEquals(flowVersion2, version);
+          } else if (runId == runid3) {
+            assertEquals(flowVersion3, version);
+          } else {
+            fail("unknown run id: " + runId);
+          }
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowActivityTableSeveralRuns(String cluster, String user,
-      String flow, Configuration c1, String flowVersion1, Long runid1,
-      String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+      String flow, Configuration c1, String flowVersion1, long runid1,
+      String flowVersion2, long runid2, String flowVersion3, long runid3)
       throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
@@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(1, rowCount);
   }
 
-  private void checkFlowActivityRunId(Long runid, String flowVersion,
+  private void checkFlowActivityRunId(long runid, String flowVersion,
       Map<byte[], byte[]> values) throws IOException {
     byte[] rq = ColumnHelper.getColumnQualifier(
         FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index bf524ea..b0f83b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun {
     String user = "testWriteFlowRunMinMaxToHBase_user1";
     String flow = "testing_flowRun_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
+    long minStartTs = 10000000000000L;
+    long greaterStartTs = 30000000000000L;
     long endTs = 1439750690000L;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator
-        .getEntityMinStartTime();
+        .getEntityMinStartTime(minStartTs);
 
     try {
       hbi = new HBaseTimelineWriterImpl(c1);
@@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun {
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
-          .getEntityGreaterStartTime();
+          .getEntityGreaterStartTime(greaterStartTs);
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
@@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun {
         .getBytes());
 
     assertEquals(2, r1.size());
-    Long starttime = (Long) GenericObjectMapper.read(values
+    long starttime = (Long) GenericObjectMapper.read(values
         .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
-    Long expmin = entityMinStartTime.getCreatedTime();
-    assertEquals(expmin, starttime);
+    assertEquals(minStartTs, starttime);
     assertEquals(endTs, GenericObjectMapper.read(values
         .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
-  }
 
-  boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
-      String flow, Long runid) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
-    assertTrue(rowKeyComponents.length == 4);
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    return true;
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      // get the flow run entity
+      TimelineEntity entity =
+          hbr.getEntity(user, cluster, flow, runid, null,
+              TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      FlowRunEntity flowRun = (FlowRunEntity)entity;
+      assertEquals(minStartTs, flowRun.getStartTime());
+      assertEquals(endTs, flowRun.getMaxEndTime());
+    } finally {
+      hbr.close();
+    }
   }
 
   /**
@@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun {
     String user = "testWriteFlowRunMetricsOneFlow_user1";
     String flow = "testing_flowRun_metrics_flow_name";
     String flowVersion = "CF7022C10F1354";
-    Long runid = 1002345678919L;
+    long runid = 1002345678919L;
 
     TimelineEntities te = new TimelineEntities();
     TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
@@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun {
 
     // check flow run
     checkFlowRunTable(cluster, user, flow, runid, c1);
+
+    // use the timeline reader to verify data
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      TimelineEntity entity =
+          hbr.getEntity(user, cluster, flow, runid, null,
+            TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+      assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+      Set<TimelineMetric> metrics = entity.getMetrics();
+      assertEquals(2, metrics.size());
+      for (TimelineMetric metric : metrics) {
+        String id = metric.getId();
+        Map<Long, Number> values = metric.getValues();
+        assertEquals(1, values.size());
+        Number value = null;
+        for (Number n : values.values()) {
+          value = n;
+        }
+        switch (id) {
+        case metric1:
+          assertEquals(141, value);
+          break;
+        case metric2:
+          assertEquals(57, value);
+          break;
+        default:
+          fail("unrecognized metric: " + id);
+        }
+      }
+    } finally {
+      hbr.close();
+    }
   }
 
   private void checkFlowRunTable(String cluster, String user, String flow,


[2/2] hadoop git commit: YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)

Posted by vr...@apache.org.
YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e7e0f0b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e7e0f0b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e7e0f0b

Branch: refs/heads/YARN-2928
Commit: 2e7e0f0bbba4b3ab4280de06d595018bc5dec87b
Parents: 4b37985
Author: Vrushali <vr...@apache.org>
Authored: Tue Sep 22 13:42:30 2015 -0700
Committer: Vrushali <vr...@apache.org>
Committed: Tue Sep 22 13:42:30 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../timelineservice/FlowActivityEntity.java     | 183 ++++++++
 .../api/records/timelineservice/FlowEntity.java | 103 -----
 .../records/timelineservice/FlowRunEntity.java  | 121 ++++++
 .../timelineservice/TimelineEntityType.java     |  31 +-
 .../TestTimelineServiceRecords.java             |  14 +-
 .../TestTimelineServiceClientIntegration.java   |   2 +-
 .../collector/TimelineCollectorWebService.java  |   6 +-
 .../storage/ApplicationEntityReader.java        | 229 ++++++++++
 .../storage/FlowActivityEntityReader.java       | 168 +++++++
 .../storage/FlowRunEntityReader.java            | 136 ++++++
 .../storage/GenericEntityReader.java            | 389 +++++++++++++++++
 .../storage/HBaseTimelineReaderImpl.java        | 434 +------------------
 .../storage/TimelineEntityReader.java           | 223 ++++++++++
 .../storage/TimelineEntityReaderFactory.java    |  97 +++++
 .../storage/application/ApplicationRowKey.java  |  68 ++-
 .../storage/apptoflow/AppToFlowRowKey.java      |  31 ++
 .../storage/common/BaseTable.java               |   3 +-
 .../storage/entity/EntityRowKey.java            |  76 +++-
 .../storage/flow/FlowActivityRowKey.java        |   7 +-
 .../storage/flow/FlowRunRowKey.java             |  50 ++-
 .../storage/flow/FlowScanner.java               |  18 +-
 .../storage/TestHBaseTimelineStorage.java       |  34 +-
 .../storage/flow/TestFlowDataGenerator.java     |  39 +-
 .../flow/TestHBaseStorageFlowActivity.java      | 131 ++++--
 .../storage/flow/TestHBaseStorageFlowRun.java   | 105 +++--
 26 files changed, 2021 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cc0f014..114b2fe 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -106,6 +106,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3901. Populate flow run data in the flow_run & flow activity tables
     (Vrushali C via sjlee)
 
+    YARN-4074. [timeline reader] implement support for querying for flows
+    and flow runs (sjlee via vrushali)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
new file mode 100644
index 0000000..163bd5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Entity that represents a record for flow activity. It's essentially a
+ * container entity for flow runs with limited information.
+ */
+@Public
+@Unstable
+public class FlowActivityEntity extends TimelineEntity {
+  public static final String CLUSTER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "CLUSTER";
+  public static final String DATE_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "DATE";
+  public static final String USER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+  public static final String FLOW_NAME_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+
+  private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>();
+
+  public FlowActivityEntity() {
+    super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    // set config to null
+    setConfigs(null);
+  }
+
+  public FlowActivityEntity(String cluster, long time, String user,
+      String flowName) {
+    this();
+    setCluster(cluster);
+    setDate(time);
+    setUser(user);
+    setFlowName(flowName);
+  }
+
+  public FlowActivityEntity(TimelineEntity entity) {
+    super(entity);
+    if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) {
+      throw new IllegalArgumentException("Incompatible entity type: " +
+          getId());
+    }
+    // set config to null
+    setConfigs(null);
+  }
+
+  @XmlElement(name = "id")
+  @Override
+  public String getId() {
+    // flow activity: cluster/day/user@flow_name
+    String id = super.getId();
+    if (id == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(getCluster());
+      sb.append('/');
+      sb.append(getDate().getTime());
+      sb.append('/');
+      sb.append(getUser());
+      sb.append('@');
+      sb.append(getFlowName());
+      id = sb.toString();
+      setId(id);
+    }
+    return id;
+  }
+
+  @Override
+  public int compareTo(TimelineEntity entity) {
+    int comparison = getType().compareTo(entity.getType());
+    if (comparison == 0) {
+      // order by cluster, date (descending), user, and flow name
+      FlowActivityEntity other = (FlowActivityEntity)entity;
+      int clusterComparison = getCluster().compareTo(other.getCluster());
+      if (clusterComparison != 0) {
+        return clusterComparison;
+      }
+      int dateComparisonDescending =
+          (int)(other.getDate().getTime() - getDate().getTime()); // descending
+      if (dateComparisonDescending != 0) {
+        return dateComparisonDescending; // descending
+      }
+      int userComparison = getUser().compareTo(other.getUser());
+      if (userComparison != 0) {
+        return userComparison;
+      }
+      return getFlowName().compareTo(other.getFlowName());
+    } else {
+      return comparison;
+    }
+  }
+
+  /**
+   * Reuse the base class equals method.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  /**
+   * Reuse the base class hashCode method.
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  public String getCluster() {
+    return (String)getInfo().get(CLUSTER_INFO_KEY);
+  }
+
+  public void setCluster(String cluster) {
+    addInfo(CLUSTER_INFO_KEY, cluster);
+  }
+
+  public Date getDate() {
+    return (Date)getInfo().get(DATE_INFO_KEY);
+  }
+
+  public void setDate(long time) {
+    Date date = new Date(time);
+    addInfo(DATE_INFO_KEY, date);
+  }
+
+  public String getUser() {
+    return (String)getInfo().get(USER_INFO_KEY);
+  }
+
+  public void setUser(String user) {
+    addInfo(USER_INFO_KEY, user);
+  }
+
+  public String getFlowName() {
+    return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+  }
+
+  public void setFlowName(String flowName) {
+    addInfo(FLOW_NAME_INFO_KEY, flowName);
+  }
+
+  public void addFlowRun(FlowRunEntity run) {
+    flowRuns.add(run);
+  }
+
+  public void addFlowRuns(Collection<FlowRunEntity> runs) {
+    flowRuns.addAll(runs);
+  }
+
+  @XmlElement(name = "flowruns")
+  public NavigableSet<FlowRunEntity> getFlowRuns() {
+    return flowRuns;
+  }
+
+  public int getNumberOfRuns() {
+    return flowRuns.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
deleted file mode 100644
index 4554778..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.api.records.timelineservice;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlElement;
-
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class FlowEntity extends HierarchicalTimelineEntity {
-  public static final String USER_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
-  public static final String FLOW_NAME_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
-  public static final String FLOW_VERSION_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_VERSION";
-  public static final String FLOW_RUN_ID_INFO_KEY =
-      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_RUN_ID";
-
-  public FlowEntity() {
-    super(TimelineEntityType.YARN_FLOW.toString());
-  }
-
-  public FlowEntity(TimelineEntity entity) {
-    super(entity);
-    if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) {
-      throw new IllegalArgumentException("Incompatible entity type: " + getId());
-    }
-  }
-
-  @XmlElement(name = "id")
-  @Override
-  public String getId() {
-    //Flow id schema: user@flow_name(or id)/version/run_id
-    String id = super.getId();
-    if (id == null) {
-      StringBuilder sb = new StringBuilder();
-      sb.append(getInfo().get(USER_INFO_KEY).toString());
-      sb.append('@');
-      sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
-      sb.append('/');
-      sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString());
-      sb.append('/');
-      sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
-      id = sb.toString();
-      setId(id);
-    }
-    return id;
-  }
-
-  public String getUser() {
-    Object user = getInfo().get(USER_INFO_KEY);
-    return user == null ? null : user.toString();
-  }
-
-  public void setUser(String user) {
-    addInfo(USER_INFO_KEY, user);
-  }
-
-  public String getName() {
-    Object name = getInfo().get(FLOW_NAME_INFO_KEY);
-    return name == null ? null : name.toString();
-  }
-
-  public void setName(String name) {
-    addInfo(FLOW_NAME_INFO_KEY, name);
-  }
-
-  public String getVersion() {
-    Object version = getInfo().get(FLOW_VERSION_INFO_KEY);
-    return version == null ? null : version.toString();
-  }
-
-  public void setVersion(String version) {
-    addInfo(FLOW_VERSION_INFO_KEY, version);
-  }
-
-  public long getRunId() {
-    Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
-    return runId == null ? 0L : (Long) runId;
-  }
-
-  public void setRunId(long runId) {
-    addInfo(FLOW_RUN_ID_INFO_KEY, runId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
new file mode 100644
index 0000000..3c3ffb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class FlowRunEntity extends HierarchicalTimelineEntity {
+  public static final String USER_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+  public static final String FLOW_NAME_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+  public static final String FLOW_VERSION_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_VERSION";
+  public static final String FLOW_RUN_ID_INFO_KEY =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX +  "FLOW_RUN_ID";
+  public static final String FLOW_RUN_END_TIME =
+      TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME";
+
+  public FlowRunEntity() {
+    super(TimelineEntityType.YARN_FLOW_RUN.toString());
+    // set config to null
+    setConfigs(null);
+  }
+
+  public FlowRunEntity(TimelineEntity entity) {
+    super(entity);
+    if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
+      throw new IllegalArgumentException("Incompatible entity type: " + getId());
+    }
+    // set config to null
+    setConfigs(null);
+  }
+
+  @XmlElement(name = "id")
+  @Override
+  public String getId() {
+    //Flow id schema: user@flow_name(or id)/run_id
+    String id = super.getId();
+    if (id == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(getInfo().get(USER_INFO_KEY).toString());
+      sb.append('@');
+      sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
+      sb.append('/');
+      sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
+      id = sb.toString();
+      setId(id);
+    }
+    return id;
+  }
+
+  public String getUser() {
+    return (String)getInfo().get(USER_INFO_KEY);
+  }
+
+  public void setUser(String user) {
+    addInfo(USER_INFO_KEY, user);
+  }
+
+  public String getName() {
+    return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+  }
+
+  public void setName(String name) {
+    addInfo(FLOW_NAME_INFO_KEY, name);
+  }
+
+  public String getVersion() {
+    return (String)getInfo().get(FLOW_VERSION_INFO_KEY);
+  }
+
+  public void setVersion(String version) {
+    addInfo(FLOW_VERSION_INFO_KEY, version);
+  }
+
+  public long getRunId() {
+    Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
+    return runId == null ? 0L : (Long) runId;
+  }
+
+  public void setRunId(long runId) {
+    addInfo(FLOW_RUN_ID_INFO_KEY, runId);
+  }
+
+  public long getStartTime() {
+    return getCreatedTime();
+  }
+
+  public void setStartTime(long startTime) {
+    setCreatedTime(startTime);
+  }
+
+  public long getMaxEndTime() {
+    Object time = getInfo().get(FLOW_RUN_END_TIME);
+    return time == null ? 0L : (Long)time;
+  }
+
+  public void setMaxEndTime(long endTime) {
+    addInfo(FLOW_RUN_END_TIME, endTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
index 6062fe1..ba32e20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
@@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum TimelineEntityType {
   YARN_CLUSTER,
-  YARN_FLOW,
+  YARN_FLOW_RUN,
   YARN_APPLICATION,
   YARN_APPLICATION_ATTEMPT,
   YARN_CONTAINER,
   YARN_USER,
-  YARN_QUEUE;
+  YARN_QUEUE,
+  YARN_FLOW_ACTIVITY;
 
+  /**
+   * Whether the input type can be a parent of this entity.
+   */
   public boolean isParent(TimelineEntityType type) {
     switch (this) {
       case YARN_CLUSTER:
         return false;
-      case YARN_FLOW:
-        return YARN_FLOW == type || YARN_CLUSTER == type;
+      case YARN_FLOW_RUN:
+        return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
       case YARN_APPLICATION:
-        return YARN_FLOW == type || YARN_CLUSTER == type;
+        return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
       case YARN_APPLICATION_ATTEMPT:
         return YARN_APPLICATION == type;
       case YARN_CONTAINER:
@@ -50,12 +54,15 @@ public enum TimelineEntityType {
     }
   }
 
+  /**
+   * Whether the input type can be a child of this entity.
+   */
   public boolean isChild(TimelineEntityType type) {
     switch (this) {
       case YARN_CLUSTER:
-        return YARN_FLOW == type || YARN_APPLICATION == type;
-      case YARN_FLOW:
-        return YARN_FLOW == type || YARN_APPLICATION == type;
+        return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
+      case YARN_FLOW_RUN:
+        return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
       case YARN_APPLICATION:
         return YARN_APPLICATION_ATTEMPT == type;
       case YARN_APPLICATION_ATTEMPT:
@@ -68,4 +75,12 @@ public enum TimelineEntityType {
         return false;
     }
   }
+
+  /**
+   * Whether the type of this entity matches the type indicated by the input
+   * argument.
+   */
+  public boolean matches(String typeString) {
+    return toString().equals(typeString);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 78943e0..7c9acf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -182,14 +182,14 @@ public class TestTimelineServiceRecords {
     ClusterEntity cluster = new ClusterEntity();
     cluster.setId("test cluster id");
 
-    FlowEntity flow1 = new FlowEntity();
+    FlowRunEntity flow1 = new FlowRunEntity();
     //flow1.setId("test flow id 1");
     flow1.setUser(user.getId());
     flow1.setName("test flow name 1");
     flow1.setVersion("test flow version 1");
     flow1.setRunId(1L);
 
-    FlowEntity flow2 = new FlowEntity();
+    FlowRunEntity flow2 = new FlowRunEntity();
     //flow2.setId("test flow run id 2");
     flow2.setUser(user.getId());
     flow2.setName("test flow name 2");
@@ -213,19 +213,19 @@ public class TestTimelineServiceRecords {
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1), 1).toString());
 
-    cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+    cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
     flow1
         .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
-    flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
-    flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+    flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+    flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
     flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
     flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId());
-    app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+    app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
     app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
         appAttempt.getId());
     appAttempt
         .setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
-    app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+    app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
     appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
         container.getId());
     container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 69031a2..5672759 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration {
       client.start();
       ClusterEntity cluster = new ClusterEntity();
       cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
-      FlowEntity flow = new FlowEntity();
+      FlowRunEntity flow = new FlowRunEntity();
       flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
       flow.setName("test_flow_name");
       flow.setVersion("test_flow_version");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 42fa365..8f595e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -205,8 +205,8 @@ public class TimelineCollectorWebService {
           case YARN_CLUSTER:
             entitiesToReturn.addEntity(new ClusterEntity(entity));
             break;
-          case YARN_FLOW:
-            entitiesToReturn.addEntity(new FlowEntity(entity));
+          case YARN_FLOW_RUN:
+            entitiesToReturn.addEntity(new FlowRunEntity(entity));
             break;
           case YARN_APPLICATION:
             entitiesToReturn.addEntity(new ApplicationEntity(entity));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
new file mode 100644
index 0000000..dfbc31d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+
+/**
+ * Timeline entity reader for application entities that are stored in the
+ * application table.
+ */
+class ApplicationEntityReader extends GenericEntityReader {
+  private static final ApplicationTable APPLICATION_TABLE =
+      new ApplicationTable();
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link ApplicationTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return APPLICATION_TABLE;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
+            appId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // If getEntities() is called for an application, there can be at most
+    // one entity. If the entity passes the filter, it is returned. Otherwise,
+    // an empty set is returned.
+    byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
+        flowRunId, appId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = table.getResult(hbaseConf, conn, get);
+    TimelineEntity entity = parseEntity(result);
+    Set<Result> set;
+    if (entity != null) {
+      set = Collections.singleton(result);
+    } else {
+      set = Collections.emptySet();
+    }
+    return set;
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    String entityId = ApplicationColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime =
+        (Number)ApplicationColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime =
+        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+          true);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+          false);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, true);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
new file mode 100644
index 0000000..d5ece2e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow activity entities that are stored in the
+ * flow activity table.
+ */
+class FlowActivityEntityReader extends TimelineEntityReader {
+  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
+      new FlowActivityTable();
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowActivityTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_ACTIVITY_TABLE;
+  }
+
+  /**
+   * Since this is strictly sorted by the row key, it is sufficient to collect
+   * the first results as specified by the limit.
+   */
+  @Override
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    Iterable<Result> results = getResults(hbaseConf, conn);
+    for (Result result : results) {
+      TimelineEntity entity = parseEntity(result);
+      if (entity == null) {
+        continue;
+      }
+      entities.add(entity);
+      if (entities.size() == limit) {
+        break;
+      }
+    }
+    return entities;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (limit == null || limit < 0) {
+      limit = TimelineReader.DEFAULT_LIMIT;
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "we don't support a single entity query");
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+    // use the page filter to limit the result to the page size
+    // the scanner may still return more than the limit; therefore we need to
+    // read the right number as we iterate
+    scan.setFilter(new PageFilter(limit));
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
+
+    long time = rowKey.getDayTimestamp();
+    String user = rowKey.getUserId();
+    String flowName = rowKey.getFlowId();
+
+    FlowActivityEntity flowActivity =
+        new FlowActivityEntity(clusterId, time, user, flowName);
+    // set the id
+    flowActivity.setId(flowActivity.getId());
+    // get the list of run ids along with the version that are associated with
+    // this flow on this day
+    Map<String, Object> runIdsMap =
+        FlowActivityColumnPrefix.RUN_ID.readResults(result);
+    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
+      Long runId = Long.valueOf(e.getKey());
+      String version = (String)e.getValue();
+      FlowRunEntity flowRun = new FlowRunEntity();
+      flowRun.setUser(user);
+      flowRun.setName(flowName);
+      flowRun.setRunId(runId);
+      flowRun.setVersion(version);
+      // set the id
+      flowRun.setId(flowRun.getId());
+      flowActivity.addFlowRun(flowRun);
+    }
+
+    return flowActivity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
new file mode 100644
index 0000000..ced795d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow run entities that are stored in the flow run
+ * table.
+ */
+class FlowRunEntityReader extends TimelineEntityReader {
+  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowRunTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_RUN_TABLE;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
+    Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn) {
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    throw new UnsupportedOperationException(
+        "multiple entity query is not supported");
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowRunEntity flowRun = new FlowRunEntity();
+    flowRun.setUser(userId);
+    flowRun.setName(flowId);
+    flowRun.setRunId(flowRunId);
+
+    // read the start time
+    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+    if (startTime != null) {
+      flowRun.setStartTime(startTime);
+    }
+    // read the end time if available
+    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+    if (endTime != null) {
+      flowRun.setMaxEndTime(endTime);
+    }
+
+    // read the flow version
+    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+    if (version != null) {
+      flowRun.setVersion(version);
+    }
+
+    // read metrics
+    readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+
+    // set the id
+    flowRun.setId(flowRun.getId());
+    return flowRun;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
new file mode 100644
index 0000000..466914b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for generic entities that are stored in the entity
+ * table.
+ */
+class GenericEntityReader extends TimelineEntityReader {
+  private static final EntityTable ENTITY_TABLE = new EntityTable();
+  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
+
+  private static final long DEFAULT_BEGIN_TIME = 0L;
+  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+        fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link EntityTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return ENTITY_TABLE;
+  }
+
+  private FlowContext lookupFlowContext(String clusterId, String appId,
+      Configuration hbaseConf, Connection conn) throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(
+          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+    } else {
+       throw new IOException(
+           "Unable to find the context flow ID and flow run ID for clusterId=" +
+           clusterId + ", appId=" + appId);
+    }
+  }
+
+  private static class FlowContext {
+    private final String flowId;
+    private final Long flowRunId;
+    public FlowContext(String flowId, Long flowRunId) {
+      this.flowId = flowId;
+      this.flowRunId = flowRunId;
+    }
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context =
+          lookupFlowContext(clusterId, appId, hbaseConf, conn);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+    if (!singleEntityRead) {
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (modifiedTimeBegin == null) {
+        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (modifiedTimeEnd == null) {
+        modifiedTimeEnd = DEFAULT_END_TIME;
+      }
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey =
+        EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
+            entityType, entityId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected Iterable<Result> getResults(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // Scan through part of the table to find the entities belong to one app
+    // and one type
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+        clusterId, userId, flowId, flowRunId, appId, entityType));
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    String entityType = EntityColumn.TYPE.readResult(result).toString();
+    entity.setType(entityType);
+    String entityId = EntityColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, false);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, EntityColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+
+  /**
+   * Helper method for reading relationship.
+   */
+  protected <T> void readRelationship(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   */
+  protected <T> void readKeyValuePairs(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+   * schema description.
+   */
+  protected void readEvents(TimelineEntity entity, Result result,
+      boolean isApplication) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<?, Object> eventsResult = isApplication ?
+        ApplicationColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result) :
+        EntityColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result);
+    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+      byte[][] karr = (byte[][])eventResult.getKey();
+      // the column name is of the form "eventId=timestamp=infoKey"
+      if (karr.length == 3) {
+        String id = Bytes.toString(karr[0]);
+        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+        TimelineEvent event = eventsMap.get(key);
+        if (event == null) {
+          event = new TimelineEvent();
+          event.setId(id);
+          event.setTimestamp(ts);
+          eventsMap.put(key, event);
+        }
+        // handle empty info
+        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+        if (infoKey != null) {
+          event.addInfo(infoKey, eventResult.getValue());
+        }
+      } else {
+        LOG.warn("incorrectly formatted column name: it will be discarded");
+        continue;
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+}