You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/12 03:06:01 UTC

[1/2] hadoop git commit: YARN-4200. Refactor reader classes in storage to nest under hbase specific package name. Contributed by Li Lu.

Repository: hadoop
Updated Branches:
  refs/heads/feature-YARN-2928 36d74ec41 -> 2dce9faa7


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
new file mode 100644
index 0000000..181ec81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for application entities that are stored in the
+ * application table.
+ */
+class ApplicationEntityReader extends GenericEntityReader {
+  private static final ApplicationTable APPLICATION_TABLE =
+      new ApplicationTable();
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+        true);
+  }
+
+  public ApplicationEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      String entityId, TimelineFilterList confsToRetrieve,
+      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
+        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link ApplicationTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return APPLICATION_TABLE;
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() {
+    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+    // Fetch all the columns.
+    if (fieldsToRetrieve.contains(Field.ALL) &&
+        (confsToRetrieve == null ||
+        confsToRetrieve.getFilterList().isEmpty()) &&
+        (metricsToRetrieve == null ||
+        metricsToRetrieve.getFilterList().isEmpty())) {
+      return list;
+    }
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+           new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    // Events not required.
+    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+    }
+    // info not required.
+    if (!fieldsToRetrieve.contains(Field.INFO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
+    }
+    // is releated to not required.
+    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+    }
+    // relates to not required.
+    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+    }
+    list.addFilter(infoColFamilyList);
+    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+        (confsToRetrieve != null &&
+        !confsToRetrieve.getFilterList().isEmpty())) {
+      FilterList filterCfg =
+          new FilterList(new FamilyFilter(CompareOp.EQUAL,
+          new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
+      if (confsToRetrieve != null &&
+          !confsToRetrieve.getFilterList().isEmpty()) {
+        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+            ApplicationColumnPrefix.CONFIG, confsToRetrieve));
+      }
+      list.addFilter(filterCfg);
+    }
+    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+        (metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty())) {
+      FilterList filterMetrics =
+          new FilterList(new FamilyFilter(CompareOp.EQUAL,
+          new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
+      if (metricsToRetrieve != null &&
+          !metricsToRetrieve.getFilterList().isEmpty()) {
+        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+            ApplicationColumnPrefix.METRIC, metricsToRetrieve));
+      }
+      list.addFilter(filterMetrics);
+    }
+    return list;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    byte[] rowKey =
+        ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
+            appId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      get.setFilter(filterList);
+    }
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    } else {
+      Preconditions.checkNotNull(userId, "userId shouldn't be null");
+      Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (singleEntityRead) {
+      if (flowName == null || flowRunId == null || userId == null) {
+        FlowContext context =
+            lookupFlowContext(clusterId, appId, hbaseConf, conn);
+        flowName = context.flowName;
+        flowRunId = context.flowRunId;
+        userId = context.userId;
+      }
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.CONFIGS);
+    }
+    if (!fieldsToRetrieve.contains(Field.METRICS) &&
+        metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.METRICS);
+    }
+    if (!singleEntityRead) {
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (modifiedTimeBegin == null) {
+        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (modifiedTimeEnd == null) {
+        modifiedTimeEnd = DEFAULT_END_TIME;
+      }
+    }
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    Scan scan = new Scan();
+    if (flowRunId != null) {
+      scan.setRowPrefixFilter(ApplicationRowKey.
+          getRowKeyPrefix(clusterId, userId, flowName, flowRunId));
+    } else {
+      scan.setRowPrefixFilter(ApplicationRowKey.
+          getRowKeyPrefix(clusterId, userId, flowName));
+    }
+    FilterList newList = new FilterList();
+    newList.addFilter(new PageFilter(limit));
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      newList.addFilter(filterList);
+    }
+    scan.setFilter(newList);
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    String entityId = ApplicationColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime =
+        (Number)ApplicationColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime =
+        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+          true);
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+          false);
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, true);
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
new file mode 100644
index 0000000..52ceef8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow activity entities that are stored in the
+ * flow activity table.
+ */
+class FlowActivityEntityReader extends TimelineEntityReader {
+  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
+      new FlowActivityTable();
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, null, null, fieldsToRetrieve, true);
+  }
+
+  public FlowActivityEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
+        null, null, fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowActivityTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_ACTIVITY_TABLE;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (limit == null || limit < 0) {
+      limit = TimelineReader.DEFAULT_LIMIT;
+    }
+    if (createdTimeBegin == null) {
+      createdTimeBegin = DEFAULT_BEGIN_TIME;
+    }
+    if (createdTimeEnd == null) {
+      createdTimeEnd = DEFAULT_END_TIME;
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() {
+    return null;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    throw new UnsupportedOperationException(
+        "we don't support a single entity query");
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    Scan scan = new Scan();
+    if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
+        createdTimeEnd == DEFAULT_END_TIME) {
+      scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+    } else {
+      scan.setStartRow(
+          FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd));
+      scan.setStopRow(
+          FlowActivityRowKey.getRowKeyPrefix(clusterId,
+              (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1))));
+    }
+    // use the page filter to limit the result to the page size
+    // the scanner may still return more than the limit; therefore we need to
+    // read the right number as we iterate
+    scan.setFilter(new PageFilter(limit));
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
+
+    long time = rowKey.getDayTimestamp();
+    String user = rowKey.getUserId();
+    String flowName = rowKey.getFlowName();
+
+    FlowActivityEntity flowActivity =
+        new FlowActivityEntity(clusterId, time, user, flowName);
+    // set the id
+    flowActivity.setId(flowActivity.getId());
+    // get the list of run ids along with the version that are associated with
+    // this flow on this day
+    Map<String, Object> runIdsMap =
+        FlowActivityColumnPrefix.RUN_ID.readResults(result);
+    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
+      Long runId = Long.valueOf(e.getKey());
+      String version = (String)e.getValue();
+      FlowRunEntity flowRun = new FlowRunEntity();
+      flowRun.setUser(user);
+      flowRun.setName(flowName);
+      flowRun.setRunId(runId);
+      flowRun.setVersion(version);
+      // set the id
+      flowRun.setId(flowRun.getId());
+      flowActivity.addFlowRun(flowRun);
+    }
+
+    return flowActivity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
new file mode 100644
index 0000000..6286ee1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow run entities that are stored in the flow run
+ * table.
+ */
+class FlowRunEntityReader extends TimelineEntityReader {
+  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
+      EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
+  }
+
+  public FlowRunEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      String entityId, TimelineFilterList confsToRetrieve,
+      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
+        null, metricsToRetrieve, fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowRunTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_RUN_TABLE;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn) {
+    if (!singleEntityRead) {
+      if (fieldsToRetrieve == null) {
+        fieldsToRetrieve = EnumSet.noneOf(Field.class);
+      }
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (!fieldsToRetrieve.contains(Field.METRICS) &&
+          metricsToRetrieve != null &&
+          !metricsToRetrieve.getFilterList().isEmpty()) {
+        fieldsToRetrieve.add(Field.METRICS);
+      }
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() {
+    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+           new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
+    // Metrics not required.
+    if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
+        !fieldsToRetrieve.contains(Field.ALL)) {
+      FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
+      infoColFamilyList.addFilter(infoColumnFamily);
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
+      list.addFilter(infoColFamilyList);
+    }
+    if (metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty()) {
+      FilterList infoColFamilyList = new FilterList();
+      infoColFamilyList.addFilter(infoColumnFamily);
+      infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+      list.addFilter(infoColFamilyList);
+    }
+    return list;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    byte[] rowKey =
+        FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      get.setFilter(filterList);
+    }
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(
+        FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName));
+    FilterList newList = new FilterList();
+    newList.addFilter(new PageFilter(limit));
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      newList.addFilter(filterList);
+    }
+    scan.setFilter(newList);
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowRunEntity flowRun = new FlowRunEntity();
+    flowRun.setUser(userId);
+    flowRun.setName(flowName);
+    if (singleEntityRead) {
+      flowRun.setRunId(flowRunId);
+    } else {
+      FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
+      flowRun.setRunId(rowKey.getFlowRunId());
+    }
+
+    // read the start time
+    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+    if (startTime != null) {
+      flowRun.setStartTime(startTime.longValue());
+    }
+    if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
+        flowRun.getStartTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // read the end time if available
+    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+    if (endTime != null) {
+      flowRun.setMaxEndTime(endTime.longValue());
+    }
+
+    // read the flow version
+    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+    if (version != null) {
+      flowRun.setVersion(version);
+    }
+
+    // read metrics
+    if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
+      readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+    }
+
+    // set the id
+    flowRun.setId(flowRun.getId());
+    return flowRun;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
new file mode 100644
index 0000000..f3f380c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for generic entities that are stored in the entity
+ * table.
+ */
+class GenericEntityReader extends TimelineEntityReader {
+  private static final EntityTable ENTITY_TABLE = new EntityTable();
+  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
+
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
+      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+        sortedKeys);
+  }
+
+  public GenericEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      String entityId, TimelineFilterList confsToRetrieve,
+      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
+    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
+        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
+  }
+
+  /**
+   * Uses the {@link EntityTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return ENTITY_TABLE;
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() {
+    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+    // Fetch all the columns.
+    if (fieldsToRetrieve.contains(Field.ALL) &&
+        (confsToRetrieve == null ||
+        confsToRetrieve.getFilterList().isEmpty()) &&
+        (metricsToRetrieve == null ||
+        metricsToRetrieve.getFilterList().isEmpty())) {
+      return list;
+    }
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    // Events not required.
+    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+          EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+    }
+    // info not required.
+    if (!fieldsToRetrieve.contains(Field.INFO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+              EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
+    }
+    // is related to not required.
+    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+              EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+    }
+    // relates to not required.
+    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+      infoColFamilyList.addFilter(
+          new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(
+              EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+    }
+    list.addFilter(infoColFamilyList);
+    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+        (confsToRetrieve != null &&
+        !confsToRetrieve.getFilterList().isEmpty())) {
+      FilterList filterCfg =
+          new FilterList(new FamilyFilter(CompareOp.EQUAL,
+              new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
+      if (confsToRetrieve != null &&
+          !confsToRetrieve.getFilterList().isEmpty()) {
+        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+            EntityColumnPrefix.CONFIG, confsToRetrieve));
+      }
+      list.addFilter(filterCfg);
+    }
+    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+        (metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty())) {
+      FilterList filterMetrics =
+          new FilterList(new FamilyFilter(CompareOp.EQUAL,
+              new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
+      if (metricsToRetrieve != null &&
+          !metricsToRetrieve.getFilterList().isEmpty()) {
+        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+            EntityColumnPrefix.METRIC, metricsToRetrieve));
+      }
+      list.addFilter(filterMetrics);
+    }
+    return list;
+  }
+
+  protected FlowContext lookupFlowContext(String clusterId, String appId,
+      Configuration hbaseConf, Connection conn) throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(
+          AppToFlowColumn.USER_ID.readResult(result).toString(),
+          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+    } else {
+       throw new IOException(
+           "Unable to find the context flow ID and flow run ID for clusterId=" +
+           clusterId + ", appId=" + appId);
+    }
+  }
+
+  protected static class FlowContext {
+    protected final String userId;
+    protected final String flowName;
+    protected final Long flowRunId;
+    public FlowContext(String user, String flowName, Long flowRunId) {
+      this.userId = user;
+      this.flowName = flowName;
+      this.flowRunId = flowRunId;
+    }
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    // In reality all three should be null or neither should be null
+    if (flowName == null || flowRunId == null || userId == null) {
+      FlowContext context =
+          lookupFlowContext(clusterId, appId, hbaseConf, conn);
+      flowName = context.flowName;
+      flowRunId = context.flowRunId;
+      userId = context.userId;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.CONFIGS);
+    }
+    if (!fieldsToRetrieve.contains(Field.METRICS) &&
+        metricsToRetrieve != null &&
+        !metricsToRetrieve.getFilterList().isEmpty()) {
+      fieldsToRetrieve.add(Field.METRICS);
+    }
+    if (!singleEntityRead) {
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+      if (modifiedTimeBegin == null) {
+        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (modifiedTimeEnd == null) {
+        modifiedTimeEnd = DEFAULT_END_TIME;
+      }
+    }
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    byte[] rowKey =
+        EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
+            entityType, entityId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      get.setFilter(filterList);
+    }
+    return table.getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    // Scan through part of the table to find the entities belong to one app
+    // and one type
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+        clusterId, userId, flowName, flowRunId, appId, entityType));
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      scan.setFilter(filterList);
+    }
+    return table.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    String entityType = EntityColumn.TYPE.readResult(result).toString();
+    entity.setType(entityType);
+    String entityId = EntityColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    // fetch created time
+    Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
+    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
+    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+      if (checkInfo &&
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, false);
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result, EntityColumnPrefix.METRIC);
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+
+  /**
+   * Helper method for reading relationship.
+   */
+  protected <T> void readRelationship(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   */
+  protected <T> void readKeyValuePairs(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+   * schema description.
+   */
+  protected void readEvents(TimelineEntity entity, Result result,
+      boolean isApplication) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<?, Object> eventsResult = isApplication ?
+        ApplicationColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result) :
+        EntityColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result);
+    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+      byte[][] karr = (byte[][])eventResult.getKey();
+      // the column name is of the form "eventId=timestamp=infoKey"
+      if (karr.length == 3) {
+        String id = Bytes.toString(karr[0]);
+        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
+        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+        TimelineEvent event = eventsMap.get(key);
+        if (event == null) {
+          event = new TimelineEvent();
+          event.setId(id);
+          event.setTimestamp(ts);
+          eventsMap.put(key, event);
+        }
+        // handle empty info
+        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+        if (infoKey != null) {
+          event.addInfo(infoKey, eventResult.getValue());
+        }
+      } else {
+        LOG.warn("incorrectly formatted column name: it will be discarded");
+        continue;
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
new file mode 100644
index 0000000..e801466
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+public abstract class TimelineEntityReader {
+  private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
+  protected static final long DEFAULT_BEGIN_TIME = 0L;
+  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  protected final boolean singleEntityRead;
+
+  protected String userId;
+  protected String clusterId;
+  protected String flowName;
+  protected Long flowRunId;
+  protected String appId;
+  protected String entityType;
+  protected EnumSet<Field> fieldsToRetrieve;
+  // used only for a single entity read mode
+  protected String entityId;
+  // used only for multiple entity read mode
+  protected Long limit;
+  protected Long createdTimeBegin;
+  protected Long createdTimeEnd;
+  protected Long modifiedTimeBegin;
+  protected Long modifiedTimeEnd;
+  protected Map<String, Set<String>> relatesTo;
+  protected Map<String, Set<String>> isRelatedTo;
+  protected Map<String, Object> infoFilters;
+  protected Map<String, String> configFilters;
+  protected Set<String> metricFilters;
+  protected Set<String> eventFilters;
+  protected TimelineFilterList confsToRetrieve;
+  protected TimelineFilterList metricsToRetrieve;
+
+  /**
+   * Main table the entity reader uses.
+   */
+  protected BaseTable<?> table;
+
+  /**
+   * Specifies whether keys for this table are sorted in a manner where entities
+   * can be retrieved by created time. If true, it will be sufficient to collect
+   * the first results as specified by the limit. Otherwise all matched entities
+   * will be fetched and then limit applied.
+   */
+  private boolean sortedKeys = false;
+
+  /**
+   * Instantiates a reader for multiple-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
+      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
+    this.singleEntityRead = false;
+    this.sortedKeys = sortedKeys;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowName = flowName;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.limit = limit;
+    this.createdTimeBegin = createdTimeBegin;
+    this.createdTimeEnd = createdTimeEnd;
+    this.modifiedTimeBegin = modifiedTimeBegin;
+    this.modifiedTimeEnd = modifiedTimeEnd;
+    this.relatesTo = relatesTo;
+    this.isRelatedTo = isRelatedTo;
+    this.infoFilters = infoFilters;
+    this.configFilters = configFilters;
+    this.metricFilters = metricFilters;
+    this.eventFilters = eventFilters;
+    this.confsToRetrieve = confsToRetrieve;
+    this.metricsToRetrieve = metricsToRetrieve;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Instantiates a reader for single-entity reads.
+   */
+  protected TimelineEntityReader(String userId, String clusterId,
+      String flowName, Long flowRunId, String appId, String entityType,
+      String entityId, TimelineFilterList confsToRetrieve,
+      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
+    this.singleEntityRead = true;
+    this.userId = userId;
+    this.clusterId = clusterId;
+    this.flowName = flowName;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+    this.entityType = entityType;
+    this.fieldsToRetrieve = fieldsToRetrieve;
+    this.entityId = entityId;
+    this.confsToRetrieve = confsToRetrieve;
+    this.metricsToRetrieve = metricsToRetrieve;
+
+    this.table = getTable();
+  }
+
+  /**
+   * Creates a {@link FilterList} based on fields, confs and metrics to
+   * retrieve. This filter list will be set in Scan/Get objects to trim down
+   * results fetched from HBase back-end storage.
+   * @return a {@link FilterList} object.
+   */
+  protected abstract FilterList constructFilterListBasedOnFields();
+
+  /**
+   * Reads and deserializes a single timeline entity from the HBase storage.
+   */
+  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    FilterList filterList = constructFilterListBasedOnFields();
+    Result result = getResult(hbaseConf, conn, filterList);
+    if (result == null || result.isEmpty()) {
+      // Could not find a matching row.
+      LOG.info("Cannot find matching entity of type " + entityType);
+      return null;
+    }
+    return parseEntity(result);
+  }
+
+  /**
+   * Reads and deserializes a set of timeline entities from the HBase storage.
+   * It goes through all the results available, and returns the number of
+   * entries as specified in the limit in the entity's natural sort order.
+   */
+  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    FilterList filterList = constructFilterListBasedOnFields();
+    ResultScanner results = getResults(hbaseConf, conn, filterList);
+    try {
+      for (Result result : results) {
+        TimelineEntity entity = parseEntity(result);
+        if (entity == null) {
+          continue;
+        }
+        entities.add(entity);
+        if (!sortedKeys) {
+          if (entities.size() > limit) {
+            entities.pollLast();
+          }
+        } else {
+          if (entities.size() == limit) {
+            break;
+          }
+        }
+      }
+      return entities;
+    } finally {
+      results.close();
+    }
+  }
+
+  /**
+   * Returns the main table to be used by the entity reader.
+   */
+  protected abstract BaseTable<?> getTable();
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Sets certain parameters to defaults if the values are not provided.
+   */
+  protected abstract void augmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException;
+
+  /**
+   * Fetches a {@link Result} instance for a single-entity read.
+   *
+   * @return the {@link Result} instance or null if no such record is found.
+   */
+  protected abstract Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException;
+
+  /**
+   * Fetches a {@link ResultScanner} for a multi-entity read.
+   */
+  protected abstract ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException;
+
+  /**
+   * Given a {@link Result} instance, deserializes and creates a
+   * {@link TimelineEntity}.
+   *
+   * @return the {@link TimelineEntity} instance, or null if the {@link Result}
+   * is null or empty.
+   */
+  protected abstract TimelineEntity parseEntity(Result result)
+      throws IOException;
+
+  /**
+   * Helper method for reading and deserializing {@link TimelineMetric} objects
+   * using the specified column prefix. The timeline metrics then are added to
+   * the given timeline entity.
+   */
+  protected void readMetrics(TimelineEntity entity, Result result,
+      ColumnPrefix<?> columnPrefix) throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        columnPrefix.readResultsWithTimestamps(result);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      metric.setType(metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..c77897a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+public class TimelineEntityReaderFactory {
+  /**
+   * Creates a timeline entity reader instance for reading a single entity with
+   * the specified input.
+   */
+  public static TimelineEntityReader createSingleEntityReader(String userId,
+      String clusterId, String flowName, Long flowRunId, String appId,
+      String entityType, String entityId, TimelineFilterList confs,
+      TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, entityId, fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
+        appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
+    }
+  }
+
+  /**
+   * Creates a timeline entity reader instance for reading set of entities with
+   * the specified input and predicates.
+   */
+  public static TimelineEntityReader createMultipleEntitiesReader(String userId,
+      String clusterId, String flowName, Long flowRunId, String appId,
+      String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      TimelineFilterList confs, TimelineFilterList metrics,
+      EnumSet<Field> fieldsToRetrieve) {
+    // currently the types that are handled separate from the generic entity
+    // table are application, flow run, and flow activity entities
+    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters, confs,
+          metrics, fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters,
+          fieldsToRetrieve);
+    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters, confs,
+          metrics, fieldsToRetrieve);
+    } else {
+      // assume we're dealing with a generic entity read
+      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
+          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+          infoFilters, configFilters, metricFilters, eventFilters, confs,
+          metrics, fieldsToRetrieve, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
new file mode 100644
index 0000000..0b3fa38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file


[2/2] hadoop git commit: YARN-4200. Refactor reader classes in storage to nest under hbase specific package name. Contributed by Li Lu.

Posted by gt...@apache.org.
YARN-4200. Refactor reader classes in storage to nest under hbase
specific package name. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2dce9faa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2dce9faa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2dce9faa

Branch: refs/heads/feature-YARN-2928
Commit: 2dce9faa74c3eda13a036ebff14f74fba39b7fc3
Parents: 36d74ec
Author: Li Lu <gt...@apache.org>
Authored: Mon Jan 11 18:05:36 2016 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Mon Jan 11 18:05:36 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/ApplicationEntityReader.java        | 382 --------------
 .../storage/FlowActivityEntityReader.java       | 163 ------
 .../storage/FlowRunEntityReader.java            | 225 ---------
 .../storage/GenericEntityReader.java            | 496 ------------------
 .../storage/HBaseTimelineReaderImpl.java        |   2 +
 .../storage/TimelineEntityReader.java           | 274 ----------
 .../storage/TimelineEntityReaderFactory.java    | 100 ----
 .../storage/reader/ApplicationEntityReader.java | 383 ++++++++++++++
 .../reader/FlowActivityEntityReader.java        | 164 ++++++
 .../storage/reader/FlowRunEntityReader.java     | 226 +++++++++
 .../storage/reader/GenericEntityReader.java     | 497 +++++++++++++++++++
 .../storage/reader/TimelineEntityReader.java    | 274 ++++++++++
 .../reader/TimelineEntityReaderFactory.java     | 100 ++++
 .../storage/reader/package-info.java            |  23 +
 15 files changed, 1672 insertions(+), 1640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cf4522a..62f847e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -129,6 +129,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
 
   IMPROVEMENTS
 
+    YARN-4200. Refactor reader classes in storage to nest under hbase specific 
+    package name. Contributed by Li Lu. 
+
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
     zjshen)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
deleted file mode 100644
index d812a6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for application entities that are stored in the
- * application table.
- */
-class ApplicationEntityReader extends GenericEntityReader {
-  private static final ApplicationTable APPLICATION_TABLE =
-      new ApplicationTable();
-
-  public ApplicationEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
-        true);
-  }
-
-  public ApplicationEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link ApplicationTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return APPLICATION_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    // Fetch all the columns.
-    if (fieldsToRetrieve.contains(Field.ALL) &&
-        (confsToRetrieve == null ||
-        confsToRetrieve.getFilterList().isEmpty()) &&
-        (metricsToRetrieve == null ||
-        metricsToRetrieve.getFilterList().isEmpty())) {
-      return list;
-    }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    // Events not required.
-    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
-        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
-    }
-    // info not required.
-    if (!fieldsToRetrieve.contains(Field.INFO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
-    }
-    // is releated to not required.
-    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
-    }
-    // relates to not required.
-    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
-    }
-    list.addFilter(infoColFamilyList);
-    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
-        (confsToRetrieve != null &&
-        !confsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
-      if (confsToRetrieve != null &&
-          !confsToRetrieve.getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.CONFIG, confsToRetrieve));
-      }
-      list.addFilter(filterCfg);
-    }
-    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
-        (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-          new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
-      if (metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            ApplicationColumnPrefix.METRIC, metricsToRetrieve));
-      }
-      list.addFilter(filterMetrics);
-    }
-    return list;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
-            appId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    } else {
-      Preconditions.checkNotNull(userId, "userId shouldn't be null");
-      Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    if (singleEntityRead) {
-      if (flowName == null || flowRunId == null || userId == null) {
-        FlowContext context =
-            lookupFlowContext(clusterId, appId, hbaseConf, conn);
-        flowName = context.flowName;
-        flowRunId = context.flowRunId;
-        userId = context.userId;
-      }
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
-        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.CONFIGS);
-    }
-    if (!fieldsToRetrieve.contains(Field.METRICS) &&
-        metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.METRICS);
-    }
-    if (!singleEntityRead) {
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (modifiedTimeBegin == null) {
-        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (modifiedTimeEnd == null) {
-        modifiedTimeEnd = DEFAULT_END_TIME;
-      }
-    }
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    if (flowRunId != null) {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(clusterId, userId, flowName, flowRunId));
-    } else {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(clusterId, userId, flowName));
-    }
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(limit));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
-    String entityId = ApplicationColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime =
-        (Number)ApplicationColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime =
-        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-          true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-          false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, true);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
deleted file mode 100644
index 7e8d4ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow activity entities that are stored in the
- * flow activity table.
- */
-class FlowActivityEntityReader extends TimelineEntityReader {
-  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
-      new FlowActivityTable();
-
-  public FlowActivityEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, null, null, fieldsToRetrieve, true);
-  }
-
-  public FlowActivityEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        null, null, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowActivityTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_ACTIVITY_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    if (limit == null || limit < 0) {
-      limit = TimelineReader.DEFAULT_LIMIT;
-    }
-    if (createdTimeBegin == null) {
-      createdTimeBegin = DEFAULT_BEGIN_TIME;
-    }
-    if (createdTimeEnd == null) {
-      createdTimeEnd = DEFAULT_END_TIME;
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    return null;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    throw new UnsupportedOperationException(
-        "we don't support a single entity query");
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
-        createdTimeEnd == DEFAULT_END_TIME) {
-      scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
-    } else {
-      scan.setStartRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd));
-      scan.setStopRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId,
-              (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1))));
-    }
-    // use the page filter to limit the result to the page size
-    // the scanner may still return more than the limit; therefore we need to
-    // read the right number as we iterate
-    scan.setFilter(new PageFilter(limit));
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
-
-    long time = rowKey.getDayTimestamp();
-    String user = rowKey.getUserId();
-    String flowName = rowKey.getFlowName();
-
-    FlowActivityEntity flowActivity =
-        new FlowActivityEntity(clusterId, time, user, flowName);
-    // set the id
-    flowActivity.setId(flowActivity.getId());
-    // get the list of run ids along with the version that are associated with
-    // this flow on this day
-    Map<String, Object> runIdsMap =
-        FlowActivityColumnPrefix.RUN_ID.readResults(result);
-    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
-      Long runId = Long.valueOf(e.getKey());
-      String version = (String)e.getValue();
-      FlowRunEntity flowRun = new FlowRunEntity();
-      flowRun.setUser(user);
-      flowRun.setName(flowName);
-      flowRun.setRunId(runId);
-      flowRun.setVersion(version);
-      // set the id
-      flowRun.setId(flowRun.getId());
-      flowActivity.addFlowRun(flowRun);
-    }
-
-    return flowActivity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
deleted file mode 100644
index c9076ee..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow run entities that are stored in the flow run
- * table.
- */
-class FlowRunEntityReader extends TimelineEntityReader {
-  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
-
-  public FlowRunEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
-  }
-
-  public FlowRunEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        null, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowRunTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_RUN_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(userId, "userId shouldn't be null");
-    Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn) {
-    if (!singleEntityRead) {
-      if (fieldsToRetrieve == null) {
-        fieldsToRetrieve = EnumSet.noneOf(Field.class);
-      }
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (!fieldsToRetrieve.contains(Field.METRICS) &&
-          metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        fieldsToRetrieve.add(Field.METRICS);
-      }
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
-    // Metrics not required.
-    if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
-        !fieldsToRetrieve.contains(Field.ALL)) {
-      FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
-      list.addFilter(infoColFamilyList);
-    }
-    if (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      FilterList infoColFamilyList = new FilterList();
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          FlowRunColumnPrefix.METRIC, metricsToRetrieve));
-      list.addFilter(infoColFamilyList);
-    }
-    return list;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    scan.setRowPrefixFilter(
-        FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName));
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(limit));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowRunEntity flowRun = new FlowRunEntity();
-    flowRun.setUser(userId);
-    flowRun.setName(flowName);
-    if (singleEntityRead) {
-      flowRun.setRunId(flowRunId);
-    } else {
-      FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
-      flowRun.setRunId(rowKey.getFlowRunId());
-    }
-
-    // read the start time
-    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
-    if (startTime != null) {
-      flowRun.setStartTime(startTime.longValue());
-    }
-    if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
-        flowRun.getStartTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // read the end time if available
-    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
-    if (endTime != null) {
-      flowRun.setMaxEndTime(endTime.longValue());
-    }
-
-    // read the flow version
-    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
-    if (version != null) {
-      flowRun.setVersion(version);
-    }
-
-    // read metrics
-    if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
-      readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
-    }
-
-    // set the id
-    flowRun.setId(flowRun.getId());
-    return flowRun;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
deleted file mode 100644
index 784dfd5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for generic entities that are stored in the entity
- * table.
- */
-class GenericEntityReader extends TimelineEntityReader {
-  private static final EntityTable ENTITY_TABLE = new EntityTable();
-  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
-
-  /**
-   * Used to look up the flow context.
-   */
-  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
-
-  public GenericEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
-        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
-        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
-        sortedKeys);
-  }
-
-  public GenericEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
-        confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
-  }
-
-  /**
-   * Uses the {@link EntityTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return ENTITY_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    // Fetch all the columns.
-    if (fieldsToRetrieve.contains(Field.ALL) &&
-        (confsToRetrieve == null ||
-        confsToRetrieve.getFilterList().isEmpty()) &&
-        (metricsToRetrieve == null ||
-        metricsToRetrieve.getFilterList().isEmpty())) {
-      return list;
-    }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    // Events not required.
-    if (!fieldsToRetrieve.contains(Field.EVENTS) &&
-        !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
-    }
-    // info not required.
-    if (!fieldsToRetrieve.contains(Field.INFO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
-    }
-    // is related to not required.
-    if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
-    }
-    // relates to not required.
-    if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
-        !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
-    }
-    list.addFilter(infoColFamilyList);
-    if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
-        (confsToRetrieve != null &&
-        !confsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
-      if (confsToRetrieve != null &&
-          !confsToRetrieve.getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.CONFIG, confsToRetrieve));
-      }
-      list.addFilter(filterCfg);
-    }
-    if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
-        (metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
-      if (metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.METRIC, metricsToRetrieve));
-      }
-      list.addFilter(filterMetrics);
-    }
-    return list;
-  }
-
-  protected FlowContext lookupFlowContext(String clusterId, String appId,
-      Configuration hbaseConf, Connection conn) throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
-    Get get = new Get(rowKey);
-    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
-    if (result != null && !result.isEmpty()) {
-      return new FlowContext(
-          AppToFlowColumn.USER_ID.readResult(result).toString(),
-          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
-    } else {
-       throw new IOException(
-           "Unable to find the context flow ID and flow run ID for clusterId=" +
-           clusterId + ", appId=" + appId);
-    }
-  }
-
-  protected static class FlowContext {
-    protected final String userId;
-    protected final String flowName;
-    protected final Long flowRunId;
-    public FlowContext(String user, String flowName, Long flowRunId) {
-      this.userId = user;
-      this.flowName = flowName;
-      this.flowRunId = flowRunId;
-    }
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
-    Preconditions.checkNotNull(appId, "appId shouldn't be null");
-    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
-    if (singleEntityRead) {
-      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    // In reality all three should be null or neither should be null
-    if (flowName == null || flowRunId == null || userId == null) {
-      FlowContext context =
-          lookupFlowContext(clusterId, appId, hbaseConf, conn);
-      flowName = context.flowName;
-      flowRunId = context.flowRunId;
-      userId = context.userId;
-    }
-    if (fieldsToRetrieve == null) {
-      fieldsToRetrieve = EnumSet.noneOf(Field.class);
-    }
-    if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
-        confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.CONFIGS);
-    }
-    if (!fieldsToRetrieve.contains(Field.METRICS) &&
-        metricsToRetrieve != null &&
-        !metricsToRetrieve.getFilterList().isEmpty()) {
-      fieldsToRetrieve.add(Field.METRICS);
-    }
-    if (!singleEntityRead) {
-      if (limit == null || limit < 0) {
-        limit = TimelineReader.DEFAULT_LIMIT;
-      }
-      if (createdTimeBegin == null) {
-        createdTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (createdTimeEnd == null) {
-        createdTimeEnd = DEFAULT_END_TIME;
-      }
-      if (modifiedTimeBegin == null) {
-        modifiedTimeBegin = DEFAULT_BEGIN_TIME;
-      }
-      if (modifiedTimeEnd == null) {
-        modifiedTimeEnd = DEFAULT_END_TIME;
-      }
-    }
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    byte[] rowKey =
-        EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
-            entityType, entityId);
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return table.getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    // Scan through part of the table to find the entities belong to one app
-    // and one type
-    Scan scan = new Scan();
-    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-        clusterId, userId, flowName, flowRunId, appId, entityType));
-    scan.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      scan.setFilter(filterList);
-    }
-    return table.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    String entityType = EntityColumn.TYPE.readResult(result).toString();
-    entity.setType(entityType);
-    String entityId = EntityColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    // fetch created time
-    Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime.longValue());
-    if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
-        entity.getCreatedTime() > createdTimeEnd)) {
-      return null;
-    }
-
-    // fetch modified time
-    Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
-    entity.setModifiedTime(modifiedTime.longValue());
-    if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
-        entity.getModifiedTime() > modifiedTimeEnd)) {
-      return null;
-    }
-
-    // fetch is related to entities
-    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), isRelatedTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities
-    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), relatesTo)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info
-    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
-    }
-
-    // fetch configs
-    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), configFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
-    }
-
-    // fetch events
-    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, false);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), eventFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics
-    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), metricFilters)) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
-    }
-    return entity;
-  }
-
-  /**
-   * Helper method for reading relationship.
-   */
-  protected <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   */
-  protected <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
-   * schema description.
-   */
-  protected void readEvents(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result) :
-        EntityColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 96c5a19..bc48cbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
 
 public class HBaseTimelineReaderImpl
     extends AbstractService implements TimelineReader {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
deleted file mode 100644
index a26c0c2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-
-/**
- * The base class for reading and deserializing timeline entities from the
- * HBase storage. Different types can be defined for different types of the
- * entities that are being requested.
- */
-abstract class TimelineEntityReader {
-  private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
-  protected static final long DEFAULT_BEGIN_TIME = 0L;
-  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
-
-  protected final boolean singleEntityRead;
-
-  protected String userId;
-  protected String clusterId;
-  protected String flowName;
-  protected Long flowRunId;
-  protected String appId;
-  protected String entityType;
-  protected EnumSet<Field> fieldsToRetrieve;
-  // used only for a single entity read mode
-  protected String entityId;
-  // used only for multiple entity read mode
-  protected Long limit;
-  protected Long createdTimeBegin;
-  protected Long createdTimeEnd;
-  protected Long modifiedTimeBegin;
-  protected Long modifiedTimeEnd;
-  protected Map<String, Set<String>> relatesTo;
-  protected Map<String, Set<String>> isRelatedTo;
-  protected Map<String, Object> infoFilters;
-  protected Map<String, String> configFilters;
-  protected Set<String> metricFilters;
-  protected Set<String> eventFilters;
-  protected TimelineFilterList confsToRetrieve;
-  protected TimelineFilterList metricsToRetrieve;
-
-  /**
-   * Main table the entity reader uses.
-   */
-  protected BaseTable<?> table;
-
-  /**
-   * Specifies whether keys for this table are sorted in a manner where entities
-   * can be retrieved by created time. If true, it will be sufficient to collect
-   * the first results as specified by the limit. Otherwise all matched entities
-   * will be fetched and then limit applied.
-   */
-  private boolean sortedKeys = false;
-
-  /**
-   * Instantiates a reader for multiple-entity reads.
-   */
-  protected TimelineEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
-      EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
-    this.singleEntityRead = false;
-    this.sortedKeys = sortedKeys;
-    this.userId = userId;
-    this.clusterId = clusterId;
-    this.flowName = flowName;
-    this.flowRunId = flowRunId;
-    this.appId = appId;
-    this.entityType = entityType;
-    this.fieldsToRetrieve = fieldsToRetrieve;
-    this.limit = limit;
-    this.createdTimeBegin = createdTimeBegin;
-    this.createdTimeEnd = createdTimeEnd;
-    this.modifiedTimeBegin = modifiedTimeBegin;
-    this.modifiedTimeEnd = modifiedTimeEnd;
-    this.relatesTo = relatesTo;
-    this.isRelatedTo = isRelatedTo;
-    this.infoFilters = infoFilters;
-    this.configFilters = configFilters;
-    this.metricFilters = metricFilters;
-    this.eventFilters = eventFilters;
-    this.confsToRetrieve = confsToRetrieve;
-    this.metricsToRetrieve = metricsToRetrieve;
-
-    this.table = getTable();
-  }
-
-  /**
-   * Instantiates a reader for single-entity reads.
-   */
-  protected TimelineEntityReader(String userId, String clusterId,
-      String flowName, Long flowRunId, String appId, String entityType,
-      String entityId, TimelineFilterList confsToRetrieve,
-      TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
-    this.singleEntityRead = true;
-    this.userId = userId;
-    this.clusterId = clusterId;
-    this.flowName = flowName;
-    this.flowRunId = flowRunId;
-    this.appId = appId;
-    this.entityType = entityType;
-    this.fieldsToRetrieve = fieldsToRetrieve;
-    this.entityId = entityId;
-    this.confsToRetrieve = confsToRetrieve;
-    this.metricsToRetrieve = metricsToRetrieve;
-
-    this.table = getTable();
-  }
-
-  /**
-   * Creates a {@link FilterList} based on fields, confs and metrics to
-   * retrieve. This filter list will be set in Scan/Get objects to trim down
-   * results fetched from HBase back-end storage.
-   * @return a {@link FilterList} object.
-   */
-  protected abstract FilterList constructFilterListBasedOnFields();
-
-  /**
-   * Reads and deserializes a single timeline entity from the HBase storage.
-   */
-  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    FilterList filterList = constructFilterListBasedOnFields();
-    Result result = getResult(hbaseConf, conn, filterList);
-    if (result == null || result.isEmpty()) {
-      // Could not find a matching row.
-      LOG.info("Cannot find matching entity of type " + entityType);
-      return null;
-    }
-    return parseEntity(result);
-  }
-
-  /**
-   * Reads and deserializes a set of timeline entities from the HBase storage.
-   * It goes through all the results available, and returns the number of
-   * entries as specified in the limit in the entity's natural sort order.
-   */
-  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
-      Connection conn) throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    FilterList filterList = constructFilterListBasedOnFields();
-    ResultScanner results = getResults(hbaseConf, conn, filterList);
-    try {
-      for (Result result : results) {
-        TimelineEntity entity = parseEntity(result);
-        if (entity == null) {
-          continue;
-        }
-        entities.add(entity);
-        if (!sortedKeys) {
-          if (entities.size() > limit) {
-            entities.pollLast();
-          }
-        } else {
-          if (entities.size() == limit) {
-            break;
-          }
-        }
-      }
-      return entities;
-    } finally {
-      results.close();
-    }
-  }
-
-  /**
-   * Returns the main table to be used by the entity reader.
-   */
-  protected abstract BaseTable<?> getTable();
-
-  /**
-   * Validates the required parameters to read the entities.
-   */
-  protected abstract void validateParams();
-
-  /**
-   * Sets certain parameters to defaults if the values are not provided.
-   */
-  protected abstract void augmentParams(Configuration hbaseConf,
-      Connection conn) throws IOException;
-
-  /**
-   * Fetches a {@link Result} instance for a single-entity read.
-   *
-   * @return the {@link Result} instance or null if no such record is found.
-   */
-  protected abstract Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException;
-
-  /**
-   * Fetches a {@link ResultScanner} for a multi-entity read.
-   */
-  protected abstract ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException;
-
-  /**
-   * Given a {@link Result} instance, deserializes and creates a
-   * {@link TimelineEntity}.
-   *
-   * @return the {@link TimelineEntity} instance, or null if the {@link Result}
-   * is null or empty.
-   */
-  protected abstract TimelineEntity parseEntity(Result result)
-      throws IOException;
-
-  /**
-   * Helper method for reading and deserializing {@link TimelineMetric} objects
-   * using the specified column prefix. The timeline metrics then are added to
-   * the given timeline entity.
-   */
-  protected void readMetrics(TimelineEntity entity, Result result,
-      ColumnPrefix<?> columnPrefix) throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-        columnPrefix.readResultsWithTimestamps(result);
-    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
-        metricsResult.entrySet()) {
-      TimelineMetric metric = new TimelineMetric();
-      metric.setId(metricResult.getKey());
-      // Simply assume that if the value set contains more than 1 elements, the
-      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      metric.setType(metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
-      metric.addValues(metricResult.getValue());
-      entity.addMetric(metric);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2dce9faa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
deleted file mode 100644
index 36ed4ca..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-
-/**
- * Factory methods for instantiating a timeline entity reader.
- */
-class TimelineEntityReaderFactory {
-  /**
-   * Creates a timeline entity reader instance for reading a single entity with
-   * the specified input.
-   */
-  public static TimelineEntityReader createSingleEntityReader(String userId,
-      String clusterId, String flowName, Long flowRunId, String appId,
-      String entityType, String entityId, TimelineFilterList confs,
-      TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
-      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
-      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
-      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, entityId, fieldsToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
-        appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
-    }
-  }
-
-  /**
-   * Creates a timeline entity reader instance for reading set of entities with
-   * the specified input and predicates.
-   */
-  public static TimelineEntityReader createMultipleEntitiesReader(String userId,
-      String clusterId, String flowName, Long flowRunId, String appId,
-      String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
-      Long modifiedTimeBegin, Long modifiedTimeEnd,
-      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
-      Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> metricFilters, Set<String> eventFilters,
-      TimelineFilterList confs, TimelineFilterList metrics,
-      EnumSet<Field> fieldsToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
-      return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
-      return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters,
-          fieldsToRetrieve);
-    } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
-      return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
-          appId, entityType, limit, createdTimeBegin, createdTimeEnd,
-          modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
-          infoFilters, configFilters, metricFilters, eventFilters, confs,
-          metrics, fieldsToRetrieve, false);
-    }
-  }
-}