You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:49 UTC

[03/50] [abbrv] hadoop git commit: YARN-3863. Support complex filters in TimelineReader (Varun Saxena via sjlee)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
index d8f73d4..6696ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -19,13 +19,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
@@ -33,28 +28,22 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
@@ -71,7 +60,6 @@ import com.google.common.base.Preconditions;
  */
 class GenericEntityReader extends TimelineEntityReader {
   private static final EntityTable ENTITY_TABLE = new EntityTable();
-  private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
 
   /**
    * Used to look up the flow context.
@@ -97,92 +85,322 @@ class GenericEntityReader extends TimelineEntityReader {
   }
 
   @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Fetch all the columns.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (dataToRetrieve.getConfsToRetrieve() == null ||
-        dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) &&
-        (dataToRetrieve.getMetricsToRetrieve() == null ||
-        dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
-      return list;
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    // Filters here cannot be null for multiple entity reads as they are set in
+    // augmentParams if null.
+    FilterList listBasedOnFilters = new FilterList();
+    TimelineEntityFilters filters = getFilters();
+    // Create filter list based on created time range and add it to
+    // listBasedOnFilters.
+    long createdTimeBegin = filters.getCreatedTimeBegin();
+    long createdTimeEnd = filters.getCreatedTimeEnd();
+    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createSingleColValueFiltersByRange(
+              EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
     }
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
+    // Create filter list based on metric filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList metricFilters = filters.getMetricFilters();
+    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              EntityColumnPrefix.METRIC, metricFilters));
+    }
+    // Create filter list based on config filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList configFilters = filters.getConfigFilters();
+    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              EntityColumnPrefix.CONFIG, configFilters));
+    }
+    // Create filter list based on info filters and add it to listBasedOnFilters
+    TimelineFilterList infoFilters = filters.getInfoFilters();
+    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              EntityColumnPrefix.INFO, infoFilters));
+    }
+    return listBasedOnFilters;
+  }
+
+  /**
+   * Check if we need to fetch only some of the event columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
+      EnumSet<Field> fieldsToRetrieve) {
+    return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
+        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
+  }
+
+  /**
+   * Check if we need to fetch only some of the relates_to columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
+      EnumSet<Field> fieldsToRetrieve) {
+    return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
+        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
+  }
+
+  /**
+   * Check if we need to fetch only some of the is_related_to columns.
+   *
+   * @return true if we need to fetch some of the columns, false otherwise.
+   */
+  private static boolean fetchPartialIsRelatedToCols(
+      TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
+    return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
+        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
+  }
+
+  /**
+   * Check if we need to fetch only some of the columns based on event filters,
+   * relatesto and isrelatedto from info family.
+   *
+   * @return true, if we need to fetch only some of the columns, false if we
+   *     need to fetch all the columns under info column family.
+   */
+  protected boolean fetchPartialColsFromInfoFamily() {
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
     TimelineEntityFilters filters = getFilters();
+    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) ||
+        fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
+        fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve);
+  }
+
+  /**
+   * Check if we need to create filter list based on fields. We need to create
+   * a filter list iff all fields need not be retrieved or we have some specific
+   * fields or metrics to retrieve. We also need to create a filter list if we
+   * have relationships(relatesTo/isRelatedTo) and event filters specified for
+   * the query.
+   *
+   * @return true if we need to create the filter list, false otherwise.
+   */
+  protected boolean needCreateFilterListBasedOnFields() {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Check if all fields are to be retrieved or not. If all fields have to
+    // be retrieved, also check if we have some metrics or configs to
+    // retrieve specified for the query because then a filter list will have
+    // to be created.
+    boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
+        (dataToRetrieve.getConfsToRetrieve() != null &&
+        !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
+        (dataToRetrieve.getMetricsToRetrieve() != null &&
+        !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
+    // Filters need to be checked only if we are reading multiple entities. If
+    // condition above is false, we check if there are relationships(relatesTo/
+    // isRelatedTo) and event filters specified for the query.
+    if (!flag && !isSingleEntityRead()) {
+      TimelineEntityFilters filters = getFilters();
+      flag = (filters.getEventFilters() != null &&
+          !filters.getEventFilters().getFilterList().isEmpty()) ||
+          (filters.getIsRelatedTo() != null &&
+          !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
+          (filters.getRelatesTo() != null &&
+          !filters.getRelatesTo().getFilterList().isEmpty());
+    }
+    return flag;
+  }
+
+  /**
+   * Add {@link QualifierFilter} filters to filter list for each column of
+   * entity table.
+   *
+   * @param list filter list to which qualifier filters have to be added.
+   */
+  protected void updateFixedColumns(FilterList list) {
+    for (EntityColumn column : EntityColumn.values()) {
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryComparator(column.getColumnQualifierBytes())));
+    }
+  }
+
+  /**
+   * Creates a filter list which indicates that only some of the column
+   * qualifiers in the info column family will be returned in result.
+   *
+   * @param isApplication If true, it means operations are to be performed for
+   *     application table, otherwise for entity table.
+   * @return filter list.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterListForColsOfInfoFamily()
+      throws IOException {
+    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+    // Add filters for each column in entity table.
+    updateFixedColumns(infoFamilyColsFilter);
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    // If INFO field has to be retrieved, add a filter for fetching columns
+    // with INFO column prefix.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.INFO));
+    }
+    TimelineFilterList relatesTo = getFilters().getRelatesTo();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      // If RELATES_TO field has to be retrieved, add a filter for fetching
+      // columns with RELATES_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
+    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain RELATES_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // relatesTo filters are specified. relatesTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> relatesToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              EntityColumnPrefix.RELATES_TO, relatesToCols));
+    }
+    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+      // columns with IS_RELATED_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // isRelatedTo filters are specified. isRelatedTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> isRelatedToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+    }
+    TimelineFilterList eventFilters = getFilters().getEventFilters();
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+      // If EVENTS field has to be retrieved, add a filter for fetching columns
+      // with EVENT column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, EntityColumnPrefix.EVENT));
+    } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
+      // Even if fields to retrieve does not contain EVENTS, we still need to
+      // have a filter to fetch some of the column qualifiers on the basis of
+      // event filters specified. Event filters will then be matched after
+      // fetching rows from HBase.
+      Set<String> eventCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createFiltersFromColumnQualifiers(
+              EntityColumnPrefix.EVENT, eventCols));
+    }
+    return infoFamilyColsFilter;
+  }
+
+  /**
+   * Exclude column prefixes via filters which are not required(based on fields
+   * to retrieve) from info column family. These filters are added to filter
+   * list which contains a filter for getting info column family.
+   *
+   * @param infoColFamilyList filter list for info column family.
+   */
+  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
     // Events not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getEventFilters() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-          EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
     }
     // info not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getInfoFilters() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
     }
     // is related to not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getIsRelatedTo() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
     }
     // relates to not required.
-    if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) &&
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
-        (isSingleEntityRead() || filters.getRelatesTo() == null)) {
+    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
       infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
     }
-    list.addFilter(infoColFamilyList);
-    if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) ||
-        (!isSingleEntityRead() && filters.getConfigFilters() != null)) ||
-        (dataToRetrieve.getConfsToRetrieve() != null &&
-        !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) {
-      FilterList filterCfg =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
-      if (dataToRetrieve.getConfsToRetrieve() != null &&
-          !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) {
-        filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.CONFIG, dataToRetrieve.getConfsToRetrieve()));
-      }
-      list.addFilter(filterCfg);
+  }
+
+  /**
+   * Updates filter list based on fields for confs and metrics to retrieve.
+   *
+   * @param listBasedOnFields filter list based on fields.
+   * @throws IOException if any problem occurs while updating filter list.
+   */
+  private void updateFilterForConfsAndMetricsToRetrieve(
+      FilterList listBasedOnFields) throws IOException {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Please note that if confsToRetrieve is specified, we would have added
+    // CONFS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+      // Create a filter list for configs.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getConfsToRetrieve(),
+              EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
     }
-    if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) ||
-        (!isSingleEntityRead() && filters.getMetricFilters() != null)) ||
-        (dataToRetrieve.getMetricsToRetrieve() != null &&
-        !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
-      FilterList filterMetrics =
-          new FilterList(new FamilyFilter(CompareOp.EQUAL,
-              new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
-      if (dataToRetrieve.getMetricsToRetrieve() != null &&
-          !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
-        filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            EntityColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve()));
-      }
-      list.addFilter(filterMetrics);
+
+    // Please note that if metricsToRetrieve is specified, we would have added
+    // METRICS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+      // Create a filter list for metrics.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getMetricsToRetrieve(),
+              EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() throws IOException {
+    if (!needCreateFilterListBasedOnFields()) {
+      // Fetch all the columns. No need of a filter.
+      return null;
+    }
+    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+      // We can fetch only some of the columns from info family.
+      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+    } else {
+      // Exclude column prefixes in info column family which are not required
+      // based on fields to retrieve.
+      excludeFieldsFromInfoColFamily(infoColFamilyList);
     }
-    return list;
+    listBasedOnFields.addFilter(infoColFamilyList);
+    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+    return listBasedOnFields;
   }
 
+  /**
+   * Looks up flow context from AppToFlow table.
+   *
+   * @param clusterId Cluster Id.
+   * @param appId App Id.
+   * @param hbaseConf HBase configuration.
+   * @param conn HBase Connection.
+   * @return flow context information.
+   * @throws IOException if any problem occurs while fetching flow information.
+   */
   protected FlowContext lookupFlowContext(String clusterId, String appId,
       Configuration hbaseConf, Connection conn) throws IOException {
     byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
@@ -200,6 +418,9 @@ class GenericEntityReader extends TimelineEntityReader {
     }
   }
 
+  /**
+   * Encapsulates flow context information.
+   */
   protected static class FlowContext {
     private final String userId;
     private final String flowName;
@@ -222,6 +443,9 @@ class GenericEntityReader extends TimelineEntityReader {
 
   @Override
   protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(
+        getDataToRetrieve(), "data to retrieve shouldn't be null");
     Preconditions.checkNotNull(getContext().getClusterId(),
         "clusterId shouldn't be null");
     Preconditions.checkNotNull(getContext().getAppId(),
@@ -241,13 +465,19 @@ class GenericEntityReader extends TimelineEntityReader {
     // In reality all three should be null or neither should be null
     if (context.getFlowName() == null || context.getFlowRunId() == null ||
         context.getUserId() == null) {
+      // Get flow context information from AppToFlow table.
       FlowContext flowContext = lookupFlowContext(
           context.getClusterId(), context.getAppId(), hbaseConf, conn);
       context.setFlowName(flowContext.flowName);
       context.setFlowRunId(flowContext.flowRunId);
       context.setUserId(flowContext.userId);
     }
+    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+    // metricsToRetrieve are specified.
     getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+    if (!isSingleEntityRead()) {
+      createFiltersIfNull();
+    }
   }
 
   @Override
@@ -298,215 +528,84 @@ class GenericEntityReader extends TimelineEntityReader {
     // fetch created time
     Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
     entity.setCreatedTime(createdTime.longValue());
-    if (!isSingleEntityRead() &&
-        (entity.getCreatedTime() < filters.getCreatedTimeBegin() ||
-        entity.getCreatedTime() > filters.getCreatedTimeEnd())) {
-      return null;
-    }
+
     EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities
+    // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // isRelatedTo are not set in HBase scan.
     boolean checkIsRelatedTo =
-        filters != null && filters.getIsRelatedTo() != null &&
-        filters.getIsRelatedTo().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
-          entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) {
+        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
+        filters.getIsRelatedTo().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
+        checkIsRelatedTo) {
+      TimelineStorageUtils.readRelationship(
+          entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+          filters.getIsRelatedTo())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+          Field.IS_RELATED_TO)) {
         entity.getIsRelatedToEntities().clear();
       }
     }
 
-    // fetch relates to entities
+    // fetch relates to entities and match relatesTo filter. If relatesTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // relatesTo are not set in HBase scan.
     boolean checkRelatesTo =
-        filters != null && filters.getRelatesTo() != null &&
-        filters.getRelatesTo().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
-          entity.getRelatesToEntities(), filters.getRelatesTo())) {
+        !isSingleEntityRead() && filters.getRelatesTo() != null &&
+        filters.getRelatesTo().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+        checkRelatesTo) {
+      TimelineStorageUtils.readRelationship(
+          entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+          filters.getRelatesTo())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
         entity.getRelatesToEntities().clear();
       }
     }
 
-    // fetch info
-    boolean checkInfo = filters != null && filters.getInfoFilters() != null &&
-        filters.getInfoFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-      if (checkInfo &&
-          !TimelineStorageUtils.matchFilters(
-          entity.getInfo(), filters.getInfoFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.INFO)) {
-        entity.getInfo().clear();
-      }
+    // fetch info if fieldsToRetrieve contains INFO or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+      TimelineStorageUtils.readKeyValuePairs(
+          entity, result, EntityColumnPrefix.INFO, false);
     }
 
-    // fetch configs
-    boolean checkConfigs =
-        filters != null && filters.getConfigFilters() != null &&
-        filters.getConfigFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineStorageUtils.matchFilters(
-          entity.getConfigs(), filters.getConfigFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.CONFIGS)) {
-        entity.getConfigs().clear();
-      }
+    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      TimelineStorageUtils.readKeyValuePairs(
+          entity, result, EntityColumnPrefix.CONFIG, true);
     }
 
-    // fetch events
+    // fetch events and match event filters if they exist. If event filters do
+    // not match, entity would be dropped. We have to match filters locally
+    // as relevant HBase filters to filter out rows on the basis of events
+    // are not set in HBase scan.
     boolean checkEvents =
-        filters != null && filters.getEventFilters() != null &&
-        filters.getEventFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, false);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
-          entity.getEvents(), filters.getEventFilters())) {
+        !isSingleEntityRead() && filters.getEventFilters() != null &&
+        filters.getEventFilters().getFilterList().size() > 0;
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
+        checkEvents) {
+      TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT);
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+          filters.getEventFilters())) {
         return null;
       }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.EVENTS)) {
+      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
         entity.getEvents().clear();
       }
     }
 
-    // fetch metrics
-    boolean checkMetrics =
-        filters != null && filters.getMetricFilters() != null &&
-        filters.getMetricFilters().size() > 0;
-    if (fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
-          entity.getMetrics(), filters.getMetricFilters())) {
-        return null;
-      }
-      if (!fieldsToRetrieve.contains(Field.ALL) &&
-          !fieldsToRetrieve.contains(Field.METRICS)) {
-        entity.getMetrics().clear();
-      }
     }
     return entity;
   }
-
-  /**
-   * Helper method for reading relationship.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isRelatedTo if true, means relationship is to be added to
-   *     isRelatedTo, otherwise its added to relatesTo.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected <T> void readRelationship(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns = prefix.readResults(result);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(
-          column.getValue().toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isConfig if true, means we are reading configs, otherwise info.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected <T> void readKeyValuePairs(
-      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
-      boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns = prefix.readResults(result);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * @param entity entity to fill.
-   * @param result HBase Result.
-   * @param isApplication if true, event read is for application table,
-   *     otherwise its being read for entity table.
-   * @throws IOException if any problem is encountered while reading result.
-   *
-   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
-   * schema description.
-   */
-  protected void readEvents(TimelineEntity entity, Result result,
-      boolean isApplication) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<?, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result) :
-        EntityColumnPrefix.EVENT.
-            readResultsHavingCompoundColumnQualifiers(result);
-    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
-      byte[][] karr = (byte[][])eventResult.getKey();
-      // the column name is of the form "eventId=timestamp=infoKey"
-      if (karr.length == 3) {
-        String id = Bytes.toString(karr[0]);
-        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
-        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
-        TimelineEvent event = eventsMap.get(key);
-        if (event == null) {
-          event = new TimelineEvent();
-          event.setId(id);
-          event.setTimestamp(ts);
-          eventsMap.put(key, event);
-        }
-        // handle empty info
-        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
-        if (infoKey != null) {
-          event.addInfo(infoKey, eventResult.getValue());
-        }
-      } else {
-        LOG.warn("incorrectly formatted column name: it will be discarded");
-        continue;
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 281e901..4299de9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -107,11 +107,60 @@ public abstract class TimelineEntityReader {
   /**
    * Creates a {@link FilterList} based on fields, confs and metrics to
    * retrieve. This filter list will be set in Scan/Get objects to trim down
-   * results fetched from HBase back-end storage.
+   * results fetched from HBase back-end storage. This is called only for
+   * multiple entity reads.
    *
    * @return a {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating filter list.
    */
-  protected abstract FilterList constructFilterListBasedOnFields();
+  protected abstract FilterList constructFilterListBasedOnFields()
+      throws IOException;
+
+  /**
+   * Creates a {@link FilterList} based on info, config and metric filters. This
+   * filter list will be set in HBase Get to trim down results fetched from
+   * HBase back-end storage.
+   *
+   * @return a {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  protected abstract FilterList constructFilterListBasedOnFilters()
+      throws IOException;
+
+  /**
+   * Combines filter lists created based on fields and based on filters.
+   *
+   * @return a {@link FilterList} object if it can be constructed. Returns null,
+   * if filter list cannot be created either on the basis of filters or on the
+   * basis of fields.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterList() throws IOException {
+    FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
+    boolean hasListBasedOnFilters = listBasedOnFilters != null &&
+        !listBasedOnFilters.getFilters().isEmpty();
+    FilterList listBasedOnFields = constructFilterListBasedOnFields();
+    boolean hasListBasedOnFields = listBasedOnFields != null &&
+        !listBasedOnFields.getFilters().isEmpty();
+    // If filter lists based on both filters and fields can be created,
+    // combine them in a new filter list and return it.
+    // If either one of them has been created, return that filter list.
+    // Return null, if none of the filter lists can be created. This indicates
+    // that no filter list needs to be added to HBase Scan as filters are not
+    // specified for the query or only the default view of entity needs to be
+    // returned.
+    if (hasListBasedOnFilters && hasListBasedOnFields) {
+      FilterList list = new FilterList();
+      list.addFilter(listBasedOnFilters);
+      list.addFilter(listBasedOnFields);
+      return list;
+    } else if (hasListBasedOnFilters) {
+      return listBasedOnFilters;
+    } else if (hasListBasedOnFields) {
+      return listBasedOnFields;
+    }
+    return null;
+  }
 
   protected TimelineReaderContext getContext() {
     return context;
@@ -126,6 +175,16 @@ public abstract class TimelineEntityReader {
   }
 
   /**
+   * Create a {@link TimelineEntityFilters} object with default values for
+   * filters.
+   */
+  protected void createFiltersIfNull() {
+    if (filters == null) {
+      filters = new TimelineEntityFilters();
+    }
+  }
+
+  /**
    * Reads and deserializes a single timeline entity from the HBase storage.
    *
    * @param hbaseConf HBase Configuration.
@@ -140,6 +199,9 @@ public abstract class TimelineEntityReader {
     augmentParams(hbaseConf, conn);
 
     FilterList filterList = constructFilterListBasedOnFields();
+    if (LOG.isDebugEnabled() && filterList != null) {
+      LOG.debug("FilterList created for get is - " + filterList);
+    }
     Result result = getResult(hbaseConf, conn, filterList);
     if (result == null || result.isEmpty()) {
       // Could not find a matching row.
@@ -166,7 +228,10 @@ public abstract class TimelineEntityReader {
     augmentParams(hbaseConf, conn);
 
     NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    FilterList filterList = constructFilterListBasedOnFields();
+    FilterList filterList = createFilterList();
+    if (LOG.isDebugEnabled() && filterList != null) {
+      LOG.debug("FilterList created for scan is - " + filterList);
+    }
     ResultScanner results = getResults(hbaseConf, conn, filterList);
     try {
       for (Result result : results) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index b6e23a9..2bd2830 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -221,7 +221,7 @@ public class TestTimelineReaderWebServices {
       assertTrue("UID should be present",
           entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
       // Includes UID.
-      assertEquals(2, entity.getInfo().size());
+      assertEquals(3, entity.getInfo().size());
       // No events will be returned as events are not part of fields.
       assertEquals(0, entity.getEvents().size());
     } finally {
@@ -247,7 +247,7 @@ public class TestTimelineReaderWebServices {
       assertTrue("UID should be present",
           entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
       // Includes UID.
-      assertEquals(2, entity.getInfo().size());
+      assertEquals(3, entity.getInfo().size());
       assertEquals(2, entity.getEvents().size());
     } finally {
       client.destroy();
@@ -443,10 +443,8 @@ public class TestTimelineReaderWebServices {
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertNotNull(entities);
-      assertEquals(2, entities.size());
-      assertTrue("Entities with id_1 and id_3 should have been present" +
-          " in response.",
-          entities.contains(newEntity("app", "id_1")) &&
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_3 should have been present in response.",
           entities.contains(newEntity("app", "id_3")));
     } finally {
       client.destroy();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
index a8a2ff8..23d64e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,6 +41,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.AfterClass;
@@ -112,6 +120,7 @@ public class TestFileSystemTimelineReaderImpl {
     entity11.setCreatedTime(1425016502000L);
     Map<String, Object> info1 = new HashMap<String, Object>();
     info1.put("info1", "val1");
+    info1.put("info2", "val5");
     entity11.addInfo(info1);
     TimelineEvent event = new TimelineEvent();
     event.setId("event_1");
@@ -121,7 +130,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineMetric metric1 = new TimelineMetric();
     metric1.setId("metric1");
     metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
-    metric1.addValue(1425016502006L, 113.2F);
+    metric1.addValue(1425016502006L, 113);
     metrics.add(metric1);
     TimelineMetric metric2 = new TimelineMetric();
     metric2.setId("metric2");
@@ -130,7 +139,7 @@ public class TestFileSystemTimelineReaderImpl {
     metrics.add(metric2);
     entity11.setMetrics(metrics);
     Map<String,String> configs = new HashMap<String, String>();
-    configs.put("config_1", "123");
+    configs.put("config_1", "127");
     entity11.setConfigs(configs);
     entity11.addRelatesToEntity("flow", "flow1");
     entity11.addIsRelatedToEntity("type1", "tid1_1");
@@ -171,7 +180,7 @@ public class TestFileSystemTimelineReaderImpl {
     info1.put("info2", 4);
     entity2.addInfo(info2);
     Map<String,String> configs2 = new HashMap<String, String>();
-    configs2.put("config_1", "123");
+    configs2.put("config_1", "129");
     configs2.put("config_3", "def");
     entity2.setConfigs(configs2);
     TimelineEvent event2 = new TimelineEvent();
@@ -182,7 +191,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineMetric metric21 = new TimelineMetric();
     metric21.setId("metric1");
     metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
-    metric21.addValue(1425016501006L, 123.2F);
+    metric21.addValue(1425016501006L, 300);
     metrics2.add(metric21);
     TimelineMetric metric22 = new TimelineMetric();
     metric22.setId("metric2");
@@ -205,6 +214,7 @@ public class TestFileSystemTimelineReaderImpl {
     entity3.setCreatedTime(1425016501050L);
     Map<String, Object> info3 = new HashMap<String, Object>();
     info3.put("info2", 3.5);
+    info3.put("info4", 20);
     entity3.addInfo(info3);
     Map<String,String> configs3 = new HashMap<String, String>();
     configs3.put("config_1", "123");
@@ -222,7 +232,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineMetric metric31 = new TimelineMetric();
     metric31.setId("metric1");
     metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
-    metric31.addValue(1425016501006L, 124.8F);
+    metric31.addValue(1425016501006L, 124);
     metrics3.add(metric31);
     TimelineMetric metric32 = new TimelineMetric();
     metric32.setId("metric2");
@@ -317,7 +327,7 @@ public class TestFileSystemTimelineReaderImpl {
     Assert.assertEquals(1425016502000L, result.getCreatedTime());
     Assert.assertEquals(3, result.getConfigs().size());
     Assert.assertEquals(3, result.getMetrics().size());
-    Assert.assertEquals(1, result.getInfo().size());
+    Assert.assertEquals(2, result.getInfo().size());
     // No events will be returned
     Assert.assertEquals(0, result.getEvents().size());
   }
@@ -344,8 +354,8 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null), new TimelineEntityFilters(),
-        new TimelineDataToRetrieve());
-    // All 3 entities will be returned
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+    // All 4 entities will be returned
     Assert.assertEquals(4, result.size());
   }
 
@@ -425,12 +435,13 @@ public class TestFileSystemTimelineReaderImpl {
   @Test
   public void testGetFilteredEntities() throws Exception {
     // Get entities based on info filters.
-    Map<String, Object> infoFilters = new HashMap<String, Object>();
-    infoFilters.put("info2", 3.5);
+    TimelineFilterList infoFilterList = new TimelineFilterList();
+    infoFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
     Set<TimelineEntity> result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),
-        new TimelineEntityFilters(null, null, null, null, null, infoFilters,
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
         null, null, null),
         new TimelineDataToRetrieve());
     Assert.assertEquals(1, result.size());
@@ -442,26 +453,30 @@ public class TestFileSystemTimelineReaderImpl {
     }
 
     // Get entities based on config filters.
-    Map<String, String> configFilters = new HashMap<String, String>();
-    configFilters.put("config_1", "123");
-    configFilters.put("config_3", "abc");
+    TimelineFilterList confFilterList = new TimelineFilterList();
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123"));
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
     result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
-        configFilters, null, null),
+        confFilterList, null, null),
         new TimelineDataToRetrieve());
-    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(1, result.size());
     for (TimelineEntity entity : result) {
-      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+      if (!entity.getId().equals("id_3")) {
         Assert.fail("Incorrect filtering based on config filters");
       }
     }
 
     // Get entities based on event filters.
-    Set<String> eventFilters = new HashSet<String>();
-    eventFilters.add("event_2");
-    eventFilters.add("event_4");
+    TimelineFilterList eventFilters = new TimelineFilterList();
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2"));
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4"));
     result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),
@@ -476,13 +491,14 @@ public class TestFileSystemTimelineReaderImpl {
     }
 
     // Get entities based on metric filters.
-    Set<String> metricFilters = new HashSet<String>();
-    metricFilters.add("metric3");
+    TimelineFilterList metricFilterList = new TimelineFilterList();
+    metricFilterList.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L));
     result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
-        metricFilters, null),
+        metricFilterList, null),
         new TimelineDataToRetrieve());
     Assert.assertEquals(2, result.size());
     // Two entities with IDs' id_1 and id_2 should be returned.
@@ -491,15 +507,266 @@ public class TestFileSystemTimelineReaderImpl {
         Assert.fail("Incorrect filtering based on metric filters");
       }
     }
-  }
+
+    // Get entities based on complex config filters.
+    TimelineFilterList list1 = new TimelineFilterList();
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129"));
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def"));
+    TimelineFilterList list2 = new TimelineFilterList();
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
+    TimelineFilterList confFilterList1 =
+        new TimelineFilterList(Operator.OR, list1, list2);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList1, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList list3 = new TimelineFilterList();
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "123"));
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    TimelineFilterList list4 = new TimelineFilterList();
+    list4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    TimelineFilterList confFilterList2 =
+        new TimelineFilterList(Operator.OR, list3, list4);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList2, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList confFilterList3 = new TimelineFilterList();
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "127"));
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList3, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for(TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList confFilterList4 = new TimelineFilterList();
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList4, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR);
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList5, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    // Get entities based on complex metric filters.
+    TimelineFilterList list6 = new TimelineFilterList();
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_THAN, "metric1", 200));
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.EQUAL, "metric3", 23));
+    TimelineFilterList list7 = new TimelineFilterList();
+    list7.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74));
+    TimelineFilterList metricFilterList1 =
+        new TimelineFilterList(Operator.OR, list6, list7);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList1, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_2 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList2 = new TimelineFilterList();
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "metric2", 70));
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList2, null),
+        new TimelineDataToRetrieve());
+   Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList3 = new TimelineFilterList();
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList3, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR);
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList4, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList5 =
+        new TimelineFilterList(new TimelineCompareFilter(
+            TimelineCompareOp.NOT_EQUAL, "metric2", 74));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList5, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList infoFilterList1 = new TimelineFilterList();
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList1,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR);
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList2,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    TimelineFilterList infoFilterList3 = new TimelineFilterList();
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList3,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR);
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList4,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+   }
 
   @Test
   public void testGetEntitiesByRelations() throws Exception {
     // Get entities based on relatesTo.
-    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
-    Set<String> relatesToIds = new HashSet<String>();
-    relatesToIds.add("flow1");
-    relatesTo.put("flow", relatesToIds);
+    TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR);
+    Set<Object> relatesToIds =
+        new HashSet<Object>(Arrays.asList((Object)"flow1"));
+    relatesTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "flow", relatesToIds));
     Set<TimelineEntity> result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),
@@ -515,10 +782,11 @@ public class TestFileSystemTimelineReaderImpl {
     }
 
     // Get entities based on isRelatedTo.
-    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
-    Set<String> isRelatedToIds = new HashSet<String>();
-    isRelatedToIds.add("tid1_2");
-    isRelatedTo.put("type1", isRelatedToIds);
+    TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR);
+    Set<Object> isRelatedToIds =
+        new HashSet<Object>(Arrays.asList((Object)"tid1_2"));
+    isRelatedTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "type1", isRelatedToIds));
     result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org