You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:49 UTC
[03/50] [abbrv] hadoop git commit: YARN-3863. Support complex filters
in TimelineReader (Varun Saxena via sjlee)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
index d8f73d4..6696ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -19,13 +19,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
@@ -33,28 +28,22 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
@@ -71,7 +60,6 @@ import com.google.common.base.Preconditions;
*/
class GenericEntityReader extends TimelineEntityReader {
private static final EntityTable ENTITY_TABLE = new EntityTable();
- private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
/**
* Used to look up the flow context.
@@ -97,92 +85,322 @@ class GenericEntityReader extends TimelineEntityReader {
}
@Override
- protected FilterList constructFilterListBasedOnFields() {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
- TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
- // Fetch all the columns.
- if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (dataToRetrieve.getConfsToRetrieve() == null ||
- dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) &&
- (dataToRetrieve.getMetricsToRetrieve() == null ||
- dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
- return list;
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
+ // Filters here cannot be null for multiple entity reads as they are set in
+ // augmentParams if null.
+ FilterList listBasedOnFilters = new FilterList();
+ TimelineEntityFilters filters = getFilters();
+ // Create filter list based on created time range and add it to
+ // listBasedOnFilters.
+ long createdTimeBegin = filters.getCreatedTimeBegin();
+ long createdTimeEnd = filters.getCreatedTimeEnd();
+ if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createSingleColValueFiltersByRange(
+ EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
}
- FilterList infoColFamilyList = new FilterList();
- // By default fetch everything in INFO column family.
- FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
- infoColFamilyList.addFilter(infoColumnFamily);
+ // Create filter list based on metric filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList metricFilters = filters.getMetricFilters();
+ if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.METRIC, metricFilters));
+ }
+ // Create filter list based on config filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList configFilters = filters.getConfigFilters();
+ if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.CONFIG, configFilters));
+ }
+ // Create filter list based on info filters and add it to listBasedOnFilters
+ TimelineFilterList infoFilters = filters.getInfoFilters();
+ if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.INFO, infoFilters));
+ }
+ return listBasedOnFilters;
+ }
+
+ /**
+ * Check if we need to fetch only some of the event columns.
+ *
+ * @return true if we need to fetch some of the columns, false otherwise.
+ */
+ private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
+ }
+
+ /**
+ * Check if we need to fetch only some of the relates_to columns.
+ *
+ * @return true if we need to fetch some of the columns, false otherwise.
+ */
+ private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
+ EnumSet<Field> fieldsToRetrieve) {
+ return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
+ }
+
+ /**
+ * Check if we need to fetch only some of the is_related_to columns.
+ *
+ * @return true if we need to fetch some of the columns, false otherwise.
+ */
+ private static boolean fetchPartialIsRelatedToCols(
+ TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
+ return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
+ }
+
+ /**
+ * Check if we need to fetch only some of the columns based on event filters,
+ * relatesto and isrelatedto from info family.
+ *
+ * @return true, if we need to fetch only some of the columns, false if we
+ * need to fetch all the columns under info column family.
+ */
+ protected boolean fetchPartialColsFromInfoFamily() {
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
TimelineEntityFilters filters = getFilters();
+ return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) ||
+ fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
+ fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve);
+ }
+
+ /**
+ * Check if we need to create filter list based on fields. We need to create
+ * a filter list iff all fields need not be retrieved or we have some specific
+ * fields or metrics to retrieve. We also need to create a filter list if we
+ * have relationships(relatesTo/isRelatedTo) and event filters specified for
+ * the query.
+ *
+ * @return true if we need to create the filter list, false otherwise.
+ */
+ protected boolean needCreateFilterListBasedOnFields() {
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+ // Check if all fields are to be retrieved or not. If all fields have to
+ // be retrieved, also check if we have some metrics or configs to
+ // retrieve specified for the query because then a filter list will have
+ // to be created.
+ boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
+ (dataToRetrieve.getConfsToRetrieve() != null &&
+ !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
+ (dataToRetrieve.getMetricsToRetrieve() != null &&
+ !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
+ // Filters need to be checked only if we are reading multiple entities. If
+ // condition above is false, we check if there are relationships(relatesTo/
+ // isRelatedTo) and event filters specified for the query.
+ if (!flag && !isSingleEntityRead()) {
+ TimelineEntityFilters filters = getFilters();
+ flag = (filters.getEventFilters() != null &&
+ !filters.getEventFilters().getFilterList().isEmpty()) ||
+ (filters.getIsRelatedTo() != null &&
+ !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
+ (filters.getRelatesTo() != null &&
+ !filters.getRelatesTo().getFilterList().isEmpty());
+ }
+ return flag;
+ }
+
+ /**
+ * Add {@link QualifierFilter} filters to filter list for each column of
+ * entity table.
+ *
+ * @param list filter list to which qualifier filters have to be added.
+ */
+ protected void updateFixedColumns(FilterList list) {
+ for (EntityColumn column : EntityColumn.values()) {
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(column.getColumnQualifierBytes())));
+ }
+ }
+
+ /**
+ * Creates a filter list which indicates that only some of the column
+ * qualifiers in the info column family will be returned in result.
+ *
+ * @param isApplication If true, it means operations are to be performed for
+ * application table, otherwise for entity table.
+ * @return filter list.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ private FilterList createFilterListForColsOfInfoFamily()
+ throws IOException {
+ FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+ // Add filters for each column in entity table.
+ updateFixedColumns(infoFamilyColsFilter);
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+ // If INFO field has to be retrieved, add a filter for fetching columns
+ // with INFO column prefix.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, EntityColumnPrefix.INFO));
+ }
+ TimelineFilterList relatesTo = getFilters().getRelatesTo();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ // If RELATES_TO field has to be retrieved, add a filter for fetching
+ // columns with RELATES_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
+ } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain RELATES_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // relatesTo filters are specified. relatesTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> relatesToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.RELATES_TO, relatesToCols));
+ }
+ TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+ // columns with IS_RELATED_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+ } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // isRelatedTo filters are specified. isRelatedTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> isRelatedToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+ }
+ TimelineFilterList eventFilters = getFilters().getEventFilters();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ // If EVENTS field has to be retrieved, add a filter for fetching columns
+ // with EVENT column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, EntityColumnPrefix.EVENT));
+ } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
+ // Even if fields to retrieve does not contain EVENTS, we still need to
+ // have a filter to fetch some of the column qualifiers on the basis of
+ // event filters specified. Event filters will then be matched after
+ // fetching rows from HBase.
+ Set<String> eventCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.EVENT, eventCols));
+ }
+ return infoFamilyColsFilter;
+ }
+
+ /**
+ * Exclude column prefixes via filters which are not required(based on fields
+ * to retrieve) from info column family. These filters are added to filter
+ * list which contains a filter for getting info column family.
+ *
+ * @param infoColFamilyList filter list for info column family.
+ */
+ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// Events not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getEventFilters() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
}
// info not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getInfoFilters() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
}
// is related to not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getIsRelatedTo() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
}
// relates to not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getRelatesTo() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
}
- list.addFilter(infoColFamilyList);
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) ||
- (!isSingleEntityRead() && filters.getConfigFilters() != null)) ||
- (dataToRetrieve.getConfsToRetrieve() != null &&
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) {
- FilterList filterCfg =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
- if (dataToRetrieve.getConfsToRetrieve() != null &&
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) {
- filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.CONFIG, dataToRetrieve.getConfsToRetrieve()));
- }
- list.addFilter(filterCfg);
+ }
+
+ /**
+ * Updates filter list based on fields for confs and metrics to retrieve.
+ *
+ * @param listBasedOnFields filter list based on fields.
+ * @throws IOException if any problem occurs while updating filter list.
+ */
+ private void updateFilterForConfsAndMetricsToRetrieve(
+ FilterList listBasedOnFields) throws IOException {
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+ // Please note that if confsToRetrieve is specified, we would have added
+ // CONFS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+ // Create a filter list for configs.
+ listBasedOnFields.addFilter(TimelineFilterUtils.
+ createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getConfsToRetrieve(),
+ EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
}
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) ||
- (!isSingleEntityRead() && filters.getMetricFilters() != null)) ||
- (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
- FilterList filterMetrics =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
- if (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
- filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve()));
- }
- list.addFilter(filterMetrics);
+
+ // Please note that if metricsToRetrieve is specified, we would have added
+ // METRICS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+ // Create a filter list for metrics.
+ listBasedOnFields.addFilter(TimelineFilterUtils.
+ createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getMetricsToRetrieve(),
+ EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
+ }
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFields() throws IOException {
+ if (!needCreateFilterListBasedOnFields()) {
+ // Fetch all the columns. No need of a filter.
+ return null;
+ }
+ FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+ // We can fetch only some of the columns from info family.
+ infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+ } else {
+ // Exclude column prefixes in info column family which are not required
+ // based on fields to retrieve.
+ excludeFieldsFromInfoColFamily(infoColFamilyList);
}
- return list;
+ listBasedOnFields.addFilter(infoColFamilyList);
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ return listBasedOnFields;
}
+ /**
+ * Looks up flow context from AppToFlow table.
+ *
+ * @param clusterId Cluster Id.
+ * @param appId App Id.
+ * @param hbaseConf HBase configuration.
+ * @param conn HBase Connection.
+ * @return flow context information.
+ * @throws IOException if any problem occurs while fetching flow information.
+ */
protected FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
@@ -200,6 +418,9 @@ class GenericEntityReader extends TimelineEntityReader {
}
}
+ /**
+ * Encapsulates flow context information.
+ */
protected static class FlowContext {
private final String userId;
private final String flowName;
@@ -222,6 +443,9 @@ class GenericEntityReader extends TimelineEntityReader {
@Override
protected void validateParams() {
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+ Preconditions.checkNotNull(
+ getDataToRetrieve(), "data to retrieve shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getAppId(),
@@ -241,13 +465,19 @@ class GenericEntityReader extends TimelineEntityReader {
// In reality all three should be null or neither should be null
if (context.getFlowName() == null || context.getFlowRunId() == null ||
context.getUserId() == null) {
+ // Get flow context information from AppToFlow table.
FlowContext flowContext = lookupFlowContext(
context.getClusterId(), context.getAppId(), hbaseConf, conn);
context.setFlowName(flowContext.flowName);
context.setFlowRunId(flowContext.flowRunId);
context.setUserId(flowContext.userId);
}
+ // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+ // metricsToRetrieve are specified.
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+ if (!isSingleEntityRead()) {
+ createFiltersIfNull();
+ }
}
@Override
@@ -298,215 +528,84 @@ class GenericEntityReader extends TimelineEntityReader {
// fetch created time
Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
- if (!isSingleEntityRead() &&
- (entity.getCreatedTime() < filters.getCreatedTimeBegin() ||
- entity.getCreatedTime() > filters.getCreatedTimeEnd())) {
- return null;
- }
+
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
- // fetch is related to entities
+ // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // isRelatedTo are not set in HBase scan.
boolean checkIsRelatedTo =
- filters != null && filters.getIsRelatedTo() != null &&
- filters.getIsRelatedTo().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
- readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
- if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
- entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) {
+ !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
+ filters.getIsRelatedTo().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
+ checkIsRelatedTo) {
+ TimelineStorageUtils.readRelationship(
+ entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+ filters.getIsRelatedTo())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+ Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
- // fetch relates to entities
+ // fetch relates to entities and match relatesTo filter. If relatesTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // relatesTo are not set in HBase scan.
boolean checkRelatesTo =
- filters != null && filters.getRelatesTo() != null &&
- filters.getRelatesTo().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
- readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
- if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
- entity.getRelatesToEntities(), filters.getRelatesTo())) {
+ !isSingleEntityRead() && filters.getRelatesTo() != null &&
+ filters.getRelatesTo().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+ checkRelatesTo) {
+ TimelineStorageUtils.readRelationship(
+ entity, result, EntityColumnPrefix.RELATES_TO, false);
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+ filters.getRelatesTo())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
- // fetch info
- boolean checkInfo = filters != null && filters.getInfoFilters() != null &&
- filters.getInfoFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
- readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
- if (checkInfo &&
- !TimelineStorageUtils.matchFilters(
- entity.getInfo(), filters.getInfoFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.INFO)) {
- entity.getInfo().clear();
- }
+ // fetch info if fieldsToRetrieve contains INFO or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ TimelineStorageUtils.readKeyValuePairs(
+ entity, result, EntityColumnPrefix.INFO, false);
}
- // fetch configs
- boolean checkConfigs =
- filters != null && filters.getConfigFilters() != null &&
- filters.getConfigFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
- readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineStorageUtils.matchFilters(
- entity.getConfigs(), filters.getConfigFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
- entity.getConfigs().clear();
- }
+ // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
+ TimelineStorageUtils.readKeyValuePairs(
+ entity, result, EntityColumnPrefix.CONFIG, true);
}
- // fetch events
+ // fetch events and match event filters if they exist. If event filters do
+ // not match, entity would be dropped. We have to match filters locally
+ // as relevant HBase filters to filter out rows on the basis of events
+ // are not set in HBase scan.
boolean checkEvents =
- filters != null && filters.getEventFilters() != null &&
- filters.getEventFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
- readEvents(entity, result, false);
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(
- entity.getEvents(), filters.getEventFilters())) {
+ !isSingleEntityRead() && filters.getEventFilters() != null &&
+ filters.getEventFilters().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
+ checkEvents) {
+ TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT);
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+ filters.getEventFilters())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.EVENTS)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
entity.getEvents().clear();
}
}
- // fetch metrics
- boolean checkMetrics =
- filters != null && filters.getMetricFilters() != null &&
- filters.getMetricFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+ // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
- if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
- entity.getMetrics(), filters.getMetricFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.METRICS)) {
- entity.getMetrics().clear();
- }
}
return entity;
}
-
- /**
- * Helper method for reading relationship.
- *
- * @param <T> Describes the type of column prefix.
- * @param entity entity to fill.
- * @param result result from HBase.
- * @param prefix column prefix.
- * @param isRelatedTo if true, means relationship is to be added to
- * isRelatedTo, otherwise its added to relatesTo.
- * @throws IOException if any problem is encountered while reading result.
- */
- protected <T> void readRelationship(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isRelatedTo) throws IOException {
- // isRelatedTo and relatesTo are of type Map<String, Set<String>>
- Map<String, Object> columns = prefix.readResults(result);
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- for (String id : Separator.VALUES.splitEncoded(
- column.getValue().toString())) {
- if (isRelatedTo) {
- entity.addIsRelatedToEntity(column.getKey(), id);
- } else {
- entity.addRelatesToEntity(column.getKey(), id);
- }
- }
- }
- }
-
- /**
- * Helper method for reading key-value pairs for either info or config.
- *
- * @param <T> Describes the type of column prefix.
- * @param entity entity to fill.
- * @param result result from HBase.
- * @param prefix column prefix.
- * @param isConfig if true, means we are reading configs, otherwise info.
- * @throws IOException if any problem is encountered while reading result.
- */
- protected <T> void readKeyValuePairs(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isConfig) throws IOException {
- // info and configuration are of type Map<String, Object or String>
- Map<String, Object> columns = prefix.readResults(result);
- if (isConfig) {
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- entity.addConfig(column.getKey(), column.getValue().toString());
- }
- } else {
- entity.addInfo(columns);
- }
- }
-
- /**
- * Read events from the entity table or the application table. The column name
- * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
- * if there is no info associated with the event.
- *
- * @param entity entity to fill.
- * @param result HBase Result.
- * @param isApplication if true, event read is for application table,
- * otherwise its being read for entity table.
- * @throws IOException if any problem is encountered while reading result.
- *
- * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
- * schema description.
- */
- protected void readEvents(TimelineEntity entity, Result result,
- boolean isApplication) throws IOException {
- Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<?, Object> eventsResult = isApplication ?
- ApplicationColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result) :
- EntityColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result);
- for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
- byte[][] karr = (byte[][])eventResult.getKey();
- // the column name is of the form "eventId=timestamp=infoKey"
- if (karr.length == 3) {
- String id = Bytes.toString(karr[0]);
- long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
- String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- event = new TimelineEvent();
- event.setId(id);
- event.setTimestamp(ts);
- eventsMap.put(key, event);
- }
- // handle empty info
- String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
- if (infoKey != null) {
- event.addInfo(infoKey, eventResult.getValue());
- }
- } else {
- LOG.warn("incorrectly formatted column name: it will be discarded");
- continue;
- }
- }
- Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
- entity.addEvents(eventsSet);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 281e901..4299de9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -107,11 +107,60 @@ public abstract class TimelineEntityReader {
/**
* Creates a {@link FilterList} based on fields, confs and metrics to
* retrieve. This filter list will be set in Scan/Get objects to trim down
- * results fetched from HBase back-end storage.
+ * results fetched from HBase back-end storage. This is called only for
+ * multiple entity reads.
*
* @return a {@link FilterList} object.
+ * @throws IOException if any problem occurs while creating filter list.
*/
- protected abstract FilterList constructFilterListBasedOnFields();
+ protected abstract FilterList constructFilterListBasedOnFields()
+ throws IOException;
+
+ /**
+ * Creates a {@link FilterList} based on info, config and metric filters. This
+ * filter list will be set in HBase Get to trim down results fetched from
+ * HBase back-end storage.
+ *
+ * @return a {@link FilterList} object.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ protected abstract FilterList constructFilterListBasedOnFilters()
+ throws IOException;
+
+ /**
+ * Combines filter lists created based on fields and based on filters.
+ *
+ * @return a {@link FilterList} object if it can be constructed. Returns null,
+ * if filter list cannot be created either on the basis of filters or on the
+ * basis of fields.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ private FilterList createFilterList() throws IOException {
+ FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
+ boolean hasListBasedOnFilters = listBasedOnFilters != null &&
+ !listBasedOnFilters.getFilters().isEmpty();
+ FilterList listBasedOnFields = constructFilterListBasedOnFields();
+ boolean hasListBasedOnFields = listBasedOnFields != null &&
+ !listBasedOnFields.getFilters().isEmpty();
+ // If filter lists based on both filters and fields can be created,
+ // combine them in a new filter list and return it.
+ // If either one of them has been created, return that filter list.
+ // Return null, if none of the filter lists can be created. This indicates
+ // that no filter list needs to be added to HBase Scan as filters are not
+ // specified for the query or only the default view of entity needs to be
+ // returned.
+ if (hasListBasedOnFilters && hasListBasedOnFields) {
+ FilterList list = new FilterList();
+ list.addFilter(listBasedOnFilters);
+ list.addFilter(listBasedOnFields);
+ return list;
+ } else if (hasListBasedOnFilters) {
+ return listBasedOnFilters;
+ } else if (hasListBasedOnFields) {
+ return listBasedOnFields;
+ }
+ return null;
+ }
protected TimelineReaderContext getContext() {
return context;
@@ -126,6 +175,16 @@ public abstract class TimelineEntityReader {
}
/**
+ * Create a {@link TimelineEntityFilters} object with default values for
+ * filters.
+ */
+ protected void createFiltersIfNull() {
+ if (filters == null) {
+ filters = new TimelineEntityFilters();
+ }
+ }
+
+ /**
* Reads and deserializes a single timeline entity from the HBase storage.
*
* @param hbaseConf HBase Configuration.
@@ -140,6 +199,9 @@ public abstract class TimelineEntityReader {
augmentParams(hbaseConf, conn);
FilterList filterList = constructFilterListBasedOnFields();
+ if (LOG.isDebugEnabled() && filterList != null) {
+ LOG.debug("FilterList created for get is - " + filterList);
+ }
Result result = getResult(hbaseConf, conn, filterList);
if (result == null || result.isEmpty()) {
// Could not find a matching row.
@@ -166,7 +228,10 @@ public abstract class TimelineEntityReader {
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
- FilterList filterList = constructFilterListBasedOnFields();
+ FilterList filterList = createFilterList();
+ if (LOG.isDebugEnabled() && filterList != null) {
+ LOG.debug("FilterList created for scan is - " + filterList);
+ }
ResultScanner results = getResults(hbaseConf, conn, filterList);
try {
for (Result result : results) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index b6e23a9..2bd2830 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -221,7 +221,7 @@ public class TestTimelineReaderWebServices {
assertTrue("UID should be present",
entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
// Includes UID.
- assertEquals(2, entity.getInfo().size());
+ assertEquals(3, entity.getInfo().size());
// No events will be returned as events are not part of fields.
assertEquals(0, entity.getEvents().size());
} finally {
@@ -247,7 +247,7 @@ public class TestTimelineReaderWebServices {
assertTrue("UID should be present",
entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
// Includes UID.
- assertEquals(2, entity.getInfo().size());
+ assertEquals(3, entity.getInfo().size());
assertEquals(2, entity.getEvents().size());
} finally {
client.destroy();
@@ -443,10 +443,8 @@ public class TestTimelineReaderWebServices {
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
- assertEquals(2, entities.size());
- assertTrue("Entities with id_1 and id_3 should have been present" +
- " in response.",
- entities.contains(newEntity("app", "id_1")) &&
+ assertEquals(1, entities.size());
+ assertTrue("Entity with id_3 should have been present in response.",
entities.contains(newEntity("app", "id_3")));
} finally {
client.destroy();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
index a8a2ff8..23d64e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,6 +41,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
@@ -112,6 +120,7 @@ public class TestFileSystemTimelineReaderImpl {
entity11.setCreatedTime(1425016502000L);
Map<String, Object> info1 = new HashMap<String, Object>();
info1.put("info1", "val1");
+ info1.put("info2", "val5");
entity11.addInfo(info1);
TimelineEvent event = new TimelineEvent();
event.setId("event_1");
@@ -121,7 +130,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineMetric metric1 = new TimelineMetric();
metric1.setId("metric1");
metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
- metric1.addValue(1425016502006L, 113.2F);
+ metric1.addValue(1425016502006L, 113);
metrics.add(metric1);
TimelineMetric metric2 = new TimelineMetric();
metric2.setId("metric2");
@@ -130,7 +139,7 @@ public class TestFileSystemTimelineReaderImpl {
metrics.add(metric2);
entity11.setMetrics(metrics);
Map<String,String> configs = new HashMap<String, String>();
- configs.put("config_1", "123");
+ configs.put("config_1", "127");
entity11.setConfigs(configs);
entity11.addRelatesToEntity("flow", "flow1");
entity11.addIsRelatedToEntity("type1", "tid1_1");
@@ -171,7 +180,7 @@ public class TestFileSystemTimelineReaderImpl {
info1.put("info2", 4);
entity2.addInfo(info2);
Map<String,String> configs2 = new HashMap<String, String>();
- configs2.put("config_1", "123");
+ configs2.put("config_1", "129");
configs2.put("config_3", "def");
entity2.setConfigs(configs2);
TimelineEvent event2 = new TimelineEvent();
@@ -182,7 +191,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineMetric metric21 = new TimelineMetric();
metric21.setId("metric1");
metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
- metric21.addValue(1425016501006L, 123.2F);
+ metric21.addValue(1425016501006L, 300);
metrics2.add(metric21);
TimelineMetric metric22 = new TimelineMetric();
metric22.setId("metric2");
@@ -205,6 +214,7 @@ public class TestFileSystemTimelineReaderImpl {
entity3.setCreatedTime(1425016501050L);
Map<String, Object> info3 = new HashMap<String, Object>();
info3.put("info2", 3.5);
+ info3.put("info4", 20);
entity3.addInfo(info3);
Map<String,String> configs3 = new HashMap<String, String>();
configs3.put("config_1", "123");
@@ -222,7 +232,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineMetric metric31 = new TimelineMetric();
metric31.setId("metric1");
metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
- metric31.addValue(1425016501006L, 124.8F);
+ metric31.addValue(1425016501006L, 124);
metrics3.add(metric31);
TimelineMetric metric32 = new TimelineMetric();
metric32.setId("metric2");
@@ -317,7 +327,7 @@ public class TestFileSystemTimelineReaderImpl {
Assert.assertEquals(1425016502000L, result.getCreatedTime());
Assert.assertEquals(3, result.getConfigs().size());
Assert.assertEquals(3, result.getMetrics().size());
- Assert.assertEquals(1, result.getInfo().size());
+ Assert.assertEquals(2, result.getInfo().size());
// No events will be returned
Assert.assertEquals(0, result.getEvents().size());
}
@@ -344,8 +354,8 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null), new TimelineEntityFilters(),
- new TimelineDataToRetrieve());
- // All 3 entities will be returned
+ new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+ // All 4 entities will be returned
Assert.assertEquals(4, result.size());
}
@@ -425,12 +435,13 @@ public class TestFileSystemTimelineReaderImpl {
@Test
public void testGetFilteredEntities() throws Exception {
// Get entities based on info filters.
- Map<String, Object> infoFilters = new HashMap<String, Object>();
- infoFilters.put("info2", 3.5);
+ TimelineFilterList infoFilterList = new TimelineFilterList();
+ infoFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
Set<TimelineEntity> result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
- new TimelineEntityFilters(null, null, null, null, null, infoFilters,
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
null, null, null),
new TimelineDataToRetrieve());
Assert.assertEquals(1, result.size());
@@ -442,26 +453,30 @@ public class TestFileSystemTimelineReaderImpl {
}
// Get entities based on config filters.
- Map<String, String> configFilters = new HashMap<String, String>();
- configFilters.put("config_1", "123");
- configFilters.put("config_3", "abc");
+ TimelineFilterList confFilterList = new TimelineFilterList();
+ confFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123"));
+ confFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
new TimelineEntityFilters(null, null, null, null, null, null,
- configFilters, null, null),
+ confFilterList, null, null),
new TimelineDataToRetrieve());
- Assert.assertEquals(2, result.size());
+ Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
- if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+ if (!entity.getId().equals("id_3")) {
Assert.fail("Incorrect filtering based on config filters");
}
}
// Get entities based on event filters.
- Set<String> eventFilters = new HashSet<String>();
- eventFilters.add("event_2");
- eventFilters.add("event_4");
+ TimelineFilterList eventFilters = new TimelineFilterList();
+ eventFilters.addFilter(
+ new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2"));
+ eventFilters.addFilter(
+ new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4"));
result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
@@ -476,13 +491,14 @@ public class TestFileSystemTimelineReaderImpl {
}
// Get entities based on metric filters.
- Set<String> metricFilters = new HashSet<String>();
- metricFilters.add("metric3");
+ TimelineFilterList metricFilterList = new TimelineFilterList();
+ metricFilterList.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L));
result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
new TimelineEntityFilters(null, null, null, null, null, null, null,
- metricFilters, null),
+ metricFilterList, null),
new TimelineDataToRetrieve());
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_2 should be returned.
@@ -491,15 +507,266 @@ public class TestFileSystemTimelineReaderImpl {
Assert.fail("Incorrect filtering based on metric filters");
}
}
- }
+
+ // Get entities based on complex config filters.
+ TimelineFilterList list1 = new TimelineFilterList();
+ list1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129"));
+ list1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def"));
+ TimelineFilterList list2 = new TimelineFilterList();
+ list2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+ list2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
+ TimelineFilterList confFilterList1 =
+ new TimelineFilterList(Operator.OR, list1, list2);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList1, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList list3 = new TimelineFilterList();
+ list3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_1", "123"));
+ list3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+ TimelineFilterList list4 = new TimelineFilterList();
+ list4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+ TimelineFilterList confFilterList2 =
+ new TimelineFilterList(Operator.OR, list3, list4);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList2, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList confFilterList3 = new TimelineFilterList();
+ confFilterList3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_1", "127"));
+ confFilterList3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList3, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for(TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList confFilterList4 = new TimelineFilterList();
+ confFilterList4.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+ confFilterList4.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_3", "def"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList4, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR);
+ confFilterList5.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+ confFilterList5.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_3", "def"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList5, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ // Get entities based on complex metric filters.
+ TimelineFilterList list6 = new TimelineFilterList();
+ list6.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_THAN, "metric1", 200));
+ list6.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.EQUAL, "metric3", 23));
+ TimelineFilterList list7 = new TimelineFilterList();
+ list7.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74));
+ TimelineFilterList metricFilterList1 =
+ new TimelineFilterList(Operator.OR, list6, list7);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList1, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_2 and id_3 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList2 = new TimelineFilterList();
+ metricFilterList2.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "metric2", 70));
+ metricFilterList2.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList2, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList3 = new TimelineFilterList();
+ metricFilterList3.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+ metricFilterList3.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList3, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR);
+ metricFilterList4.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+ metricFilterList4.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList4, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList5 =
+ new TimelineFilterList(new TimelineCompareFilter(
+ TimelineCompareOp.NOT_EQUAL, "metric2", 74));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList5, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList infoFilterList1 = new TimelineFilterList();
+ infoFilterList1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+ infoFilterList1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList1,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR);
+ infoFilterList2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+ infoFilterList2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList2,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ TimelineFilterList infoFilterList3 = new TimelineFilterList();
+ infoFilterList3.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+ infoFilterList3.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList3,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR);
+ infoFilterList4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+ infoFilterList4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList4,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+ }
@Test
public void testGetEntitiesByRelations() throws Exception {
// Get entities based on relatesTo.
- Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
- Set<String> relatesToIds = new HashSet<String>();
- relatesToIds.add("flow1");
- relatesTo.put("flow", relatesToIds);
+ TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR);
+ Set<Object> relatesToIds =
+ new HashSet<Object>(Arrays.asList((Object)"flow1"));
+ relatesTo.addFilter(new TimelineKeyValuesFilter(
+ TimelineCompareOp.EQUAL, "flow", relatesToIds));
Set<TimelineEntity> result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
@@ -515,10 +782,11 @@ public class TestFileSystemTimelineReaderImpl {
}
// Get entities based on isRelatedTo.
- Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
- Set<String> isRelatedToIds = new HashSet<String>();
- isRelatedToIds.add("tid1_2");
- isRelatedTo.put("type1", isRelatedToIds);
+ TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR);
+ Set<Object> isRelatedToIds =
+ new HashSet<Object>(Arrays.asList((Object)"tid1_2"));
+ isRelatedTo.addFilter(new TimelineKeyValuesFilter(
+ TimelineCompareOp.EQUAL, "type1", isRelatedToIds));
result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org