You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:48:50 UTC
[04/50] [abbrv] hadoop git commit: YARN-3863. Support complex filters
in TimelineReader (Varun Saxena via sjlee)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index b5fc214..2d85bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -17,21 +17,26 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
-import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -39,6 +44,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@@ -53,6 +67,8 @@ public final class TimelineStorageUtils {
private TimelineStorageUtils() {
}
+ private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
+
/** empty bytes. */
public static final byte[] EMPTY_BYTES = new byte[0];
@@ -312,6 +328,21 @@ public final class TimelineStorageUtils {
}
/**
+ * Check if we have a certain field amongst fields to retrieve. This method
+ * checks against {@link Field#ALL} as well because that would mean field
+ * passed needs to be matched.
+ *
+ * @param fieldsToRetrieve fields to be retrieved.
+ * @param requiredField fields to be checked in fieldsToRetrieve.
+ * @return true if has the required field, false otherwise.
+ */
+ public static boolean hasField(EnumSet<Field> fieldsToRetrieve,
+ Field requiredField) {
+ return fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(requiredField);
+ }
+
+ /**
* Checks if the input TimelineEntity object is an ApplicationEntity.
*
* @param te TimelineEntity object.
@@ -385,87 +416,317 @@ public final class TimelineStorageUtils {
}
/**
+ * Matches key-values filter. Used for relatesTo/isRelatedTo filters.
*
- * @param entityRelations the relations of an entity
- * @param relationFilters the relations for filtering
- * @return a boolean flag to indicate if both match
+ * @param entity entity which holds relatesTo/isRelatedTo relations which we
+ * will match against.
+ * @param keyValuesFilter key-values filter.
+ * @param entityFiltersType type of filters we are trying to match.
+ * @return true, if filter matches, false otherwise.
*/
- public static boolean matchRelations(
- Map<String, Set<String>> entityRelations,
- Map<String, Set<String>> relationFilters) {
- for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
- Set<String> ids = entityRelations.get(relation.getKey());
- if (ids == null) {
+ private static boolean matchKeyValuesFilter(TimelineEntity entity,
+ TimelineKeyValuesFilter keyValuesFilter,
+ TimelineEntityFiltersType entityFiltersType) {
+ Map<String, Set<String>> relations = null;
+ if (entityFiltersType == TimelineEntityFiltersType.IS_RELATED_TO) {
+ relations = entity.getIsRelatedToEntities();
+ } else if (entityFiltersType == TimelineEntityFiltersType.RELATES_TO) {
+ relations = entity.getRelatesToEntities();
+ }
+ if (relations == null) {
+ return false;
+ }
+ Set<String> ids = relations.get(keyValuesFilter.getKey());
+ if (ids == null) {
+ return false;
+ }
+ boolean matched = false;
+ for (Object id : keyValuesFilter.getValues()) {
+ // Matches if id is found amongst the relationships for an entity and
+ // filter's compare op is EQUAL.
+ // If compare op is NOT_EQUAL, for a match to occur, id should not be
+ // found amongst relationships for an entity.
+ matched = !(ids.contains(id) ^
+ keyValuesFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+ if (!matched) {
return false;
}
- for (String id : relation.getValue()) {
- if (!ids.contains(id)) {
- return false;
- }
- }
}
return true;
}
/**
+ * Matches relatesto.
*
- * @param map the map of key/value pairs in an entity
- * @param filters the map of key/value pairs for filtering
- * @return a boolean flag to indicate if both match
+ * @param entity entity which holds relatesto relations.
+ * @param relatesTo the relations for filtering.
+ * @return true, if filter matches, false otherwise.
+ * @throws IOException if an unsupported filter for matching relations is
+ * being matched.
*/
- public static boolean matchFilters(Map<String, ? extends Object> map,
- Map<String, ? extends Object> filters) {
- for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
- Object value = map.get(filter.getKey());
- if (value == null) {
- return false;
- }
- if (!value.equals(filter.getValue())) {
- return false;
- }
+ public static boolean matchRelatesTo(TimelineEntity entity,
+ TimelineFilterList relatesTo) throws IOException {
+ return matchFilters(
+ entity, relatesTo, TimelineEntityFiltersType.RELATES_TO);
+ }
+
+ /**
+ * Matches isrelatedto.
+ *
+ * @param entity entity which holds isRelatedTo relations.
+ * @param isRelatedTo the relations for filtering.
+ * @return true, if filter matches, false otherwise.
+ * @throws IOException if an unsupported filter for matching relations is
+ * being matched.
+ */
+ public static boolean matchIsRelatedTo(TimelineEntity entity,
+ TimelineFilterList isRelatedTo) throws IOException {
+ return matchFilters(
+ entity, isRelatedTo, TimelineEntityFiltersType.IS_RELATED_TO);
+ }
+
+ /**
+ * Matches key-value filter. Used for config and info filters.
+ *
+ * @param entity entity which holds the config/info which we will match
+ * against.
+ * @param kvFilter a key-value filter.
+ * @param entityFiltersType type of filters we are trying to match.
+ * @return true, if filter matches, false otherwise.
+ */
+ private static boolean matchKeyValueFilter(TimelineEntity entity,
+ TimelineKeyValueFilter kvFilter,
+ TimelineEntityFiltersType entityFiltersType) {
+ Map<String, ? extends Object> map = null;
+ // Supported only for config and info filters.
+ if (entityFiltersType == TimelineEntityFiltersType.CONFIG) {
+ map = entity.getConfigs();
+ } else if (entityFiltersType == TimelineEntityFiltersType.INFO) {
+ map = entity.getInfo();
}
- return true;
+ if (map == null) {
+ return false;
+ }
+ Object value = map.get(kvFilter.getKey());
+ if (value == null) {
+ return false;
+ }
+ // Matches if filter's value is equal to the value of the key and filter's
+ // compare op is EQUAL.
+ // If compare op is NOT_EQUAL, for a match to occur, value should not be
+ // equal to the value of the key.
+ return !(value.equals(kvFilter.getValue()) ^
+ kvFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+ }
+
+ /**
+ * Matches config filters.
+ *
+ * @param entity entity which holds a map of config key-value pairs.
+ * @param configFilters list of info filters.
+ * @return a boolean flag to indicate if both match.
+ * @throws IOException if an unsupported filter for matching config filters is
+ * being matched.
+ */
+ public static boolean matchConfigFilters(TimelineEntity entity,
+ TimelineFilterList configFilters) throws IOException {
+ return
+ matchFilters(entity, configFilters, TimelineEntityFiltersType.CONFIG);
+ }
+
+ /**
+ * Matches info filters.
+ *
+ * @param entity entity which holds a map of info key-value pairs.
+ * @param infoFilters list of info filters.
+ * @return a boolean flag to indicate if both match.
+ * @throws IOException if an unsupported filter for matching info filters is
+ * being matched.
+ */
+ public static boolean matchInfoFilters(TimelineEntity entity,
+ TimelineFilterList infoFilters) throws IOException {
+ return matchFilters(entity, infoFilters, TimelineEntityFiltersType.INFO);
}
/**
+ * Matches exists filter. Used for event filters.
*
- * @param entityEvents the set of event objects in an entity
- * @param eventFilters the set of event Ids for filtering
- * @return a boolean flag to indicate if both match
+ * @param entity entity which holds the events which we will match against.
+ * @param existsFilter exists filter.
+ * @param entityFiltersType type of filters we are trying to match.
+ * @return true, if filter matches, false otherwise.
*/
- public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
- Set<String> eventFilters) {
+ private static boolean matchExistsFilter(TimelineEntity entity,
+ TimelineExistsFilter existsFilter,
+ TimelineEntityFiltersType entityFiltersType) {
+ // Currently exists filter is only supported for event filters.
+ if (entityFiltersType != TimelineEntityFiltersType.EVENT) {
+ return false;
+ }
Set<String> eventIds = new HashSet<String>();
- for (TimelineEvent event : entityEvents) {
+ for (TimelineEvent event : entity.getEvents()) {
eventIds.add(event.getId());
}
- for (String eventFilter : eventFilters) {
- if (!eventIds.contains(eventFilter)) {
- return false;
- }
+ // Matches if filter's value is contained in the list of events filter's
+ // compare op is EQUAL.
+ // If compare op is NOT_EQUAL, for a match to occur, value should not be
+ // contained in the list of events.
+ return !(eventIds.contains(existsFilter.getValue()) ^
+ existsFilter.getCompareOp() == TimelineCompareOp.EQUAL);
+ }
+
+ /**
+ * Matches event filters.
+ *
+ * @param entity entity which holds a set of event objects.
+ * @param eventFilters the set of event Ids for filtering.
+ * @return a boolean flag to indicate if both match.
+ * @throws IOException if an unsupported filter for matching event filters is
+ * being matched.
+ */
+ public static boolean matchEventFilters(TimelineEntity entity,
+ TimelineFilterList eventFilters) throws IOException {
+ return matchFilters(entity, eventFilters, TimelineEntityFiltersType.EVENT);
+ }
+
+ /**
+ * Compare two values based on comparison operator.
+ *
+ * @param compareOp comparison operator.
+ * @param val1 value 1.
+ * @param val2 value 2.
+ * @return true, if relation matches, false otherwise
+ */
+ private static boolean compareValues(TimelineCompareOp compareOp,
+ long val1, long val2) {
+ switch (compareOp) {
+ case LESS_THAN:
+ return val1 < val2;
+ case LESS_OR_EQUAL:
+ return val1 <= val2;
+ case EQUAL:
+ return val1 == val2;
+ case NOT_EQUAL:
+ return val1 != val2;
+ case GREATER_OR_EQUAL:
+ return val1 >= val2;
+ case GREATER_THAN:
+ return val1 > val2;
+ default:
+ throw new RuntimeException("Unknown TimelineCompareOp " +
+ compareOp.name());
}
- return true;
}
/**
+ * Matches compare filter. Used for metric filters.
*
- * @param metrics the set of metric objects in an entity
- * @param metricFilters the set of metric Ids for filtering
- * @return a boolean flag to indicate if both match
+ * @param entity entity which holds the metrics which we will match against.
+ * @param compareFilter compare filter.
+ * @param entityFiltersType type of filters we are trying to match.
+ * @return true, if filter matches, false otherwise.
+ * @throws IOException if metric filters holds non integral values.
*/
- public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
- Set<String> metricFilters) {
- Set<String> metricIds = new HashSet<String>();
- for (TimelineMetric metric : metrics) {
- metricIds.add(metric.getId());
+ private static boolean matchCompareFilter(TimelineEntity entity,
+ TimelineCompareFilter compareFilter,
+ TimelineEntityFiltersType entityFiltersType) throws IOException {
+ // Currently exists filter is only supported for metric filters.
+ if (entityFiltersType != TimelineEntityFiltersType.METRIC) {
+ return false;
+ }
+ // We expect only integral values(short/int/long) for metric filters.
+ if (!isIntegralValue(compareFilter.getValue())) {
+ throw new IOException("Metric filters has non integral values");
+ }
+ Map<String, TimelineMetric> metricMap =
+ new HashMap<String, TimelineMetric>();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ metricMap.put(metric.getId(), metric);
}
+ TimelineMetric metric = metricMap.get(compareFilter.getKey());
+ if (metric == null) {
+ return false;
+ }
+ // We will be using the latest value of metric to compare.
+ return compareValues(compareFilter.getCompareOp(),
+ metric.getValuesJAXB().firstEntry().getValue().longValue(),
+ ((Number)compareFilter.getValue()).longValue());
+ }
- for (String metricFilter : metricFilters) {
- if (!metricIds.contains(metricFilter)) {
- return false;
+ /**
+ * Matches metric filters.
+ *
+ * @param entity entity which holds a set of metric objects.
+ * @param metricFilters list of metric filters.
+ * @return a boolean flag to indicate if both match.
+ * @throws IOException if an unsupported filter for matching metric filters is
+ * being matched.
+ */
+ public static boolean matchMetricFilters(TimelineEntity entity,
+ TimelineFilterList metricFilters) throws IOException {
+ return matchFilters(
+ entity, metricFilters, TimelineEntityFiltersType.METRIC);
+ }
+
+ /**
+ * Common routine to match different filters. Iterates over a filter list and
+ * calls routines based on filter type.
+ *
+ * @param entity Timeline entity.
+ * @param filters filter list.
+ * @param entityFiltersType type of filters which are being matched.
+ * @return a boolean flag to indicate if filter matches.
+ * @throws IOException if an unsupported filter for matching this specific
+ * filter is being matched.
+ */
+ private static boolean matchFilters(TimelineEntity entity,
+ TimelineFilterList filters, TimelineEntityFiltersType entityFiltersType)
+ throws IOException {
+ if (filters == null || filters.getFilterList().isEmpty()) {
+ return false;
+ }
+ TimelineFilterList.Operator operator = filters.getOperator();
+ for (TimelineFilter filter : filters.getFilterList()) {
+ TimelineFilterType filterType = filter.getFilterType();
+ if (!entityFiltersType.isValidFilter(filterType)) {
+ throw new IOException("Unsupported filter " + filterType);
+ }
+ boolean matched = false;
+ switch (filterType) {
+ case LIST:
+ matched = matchFilters(entity, (TimelineFilterList)filter,
+ entityFiltersType);
+ break;
+ case COMPARE:
+ matched = matchCompareFilter(entity, (TimelineCompareFilter)filter,
+ entityFiltersType);
+ break;
+ case EXISTS:
+ matched = matchExistsFilter(entity, (TimelineExistsFilter)filter,
+ entityFiltersType);
+ break;
+ case KEY_VALUE:
+ matched = matchKeyValueFilter(entity, (TimelineKeyValueFilter)filter,
+ entityFiltersType);
+ break;
+ case KEY_VALUES:
+ matched = matchKeyValuesFilter(entity, (TimelineKeyValuesFilter)filter,
+ entityFiltersType);
+ break;
+ default:
+ throw new IOException("Unsupported filter " + filterType);
+ }
+ if (!matched) {
+ if(operator == TimelineFilterList.Operator.AND) {
+ return false;
+ }
+ } else {
+ if(operator == TimelineFilterList.Operator.OR) {
+ return true;
+ }
}
}
- return true;
+ return operator == TimelineFilterList.Operator.AND;
}
/**
@@ -530,4 +791,100 @@ public final class TimelineStorageUtils {
}
return appId;
}
+
+ /**
+ * Helper method for reading relationship.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result result from HBase.
+ * @param prefix column prefix.
+ * @param isRelatedTo if true, means relationship is to be added to
+ * isRelatedTo, otherwise its added to relatesTo.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ public static <T> void readRelationship(
+ TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+ boolean isRelatedTo) throws IOException {
+ // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+ Map<String, Object> columns = prefix.readResults(result);
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ for (String id : Separator.VALUES.splitEncoded(
+ column.getValue().toString())) {
+ if (isRelatedTo) {
+ entity.addIsRelatedToEntity(column.getKey(), id);
+ } else {
+ entity.addRelatesToEntity(column.getKey(), id);
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method for reading key-value pairs for either info or config.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result result from HBase.
+ * @param prefix column prefix.
+ * @param isConfig if true, means we are reading configs, otherwise info.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ public static <T> void readKeyValuePairs(
+ TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+ boolean isConfig) throws IOException {
+ // info and configuration are of type Map<String, Object or String>
+ Map<String, Object> columns = prefix.readResults(result);
+ if (isConfig) {
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ entity.addConfig(column.getKey(), column.getValue().toString());
+ }
+ } else {
+ entity.addInfo(columns);
+ }
+ }
+
+ /**
+ * Read events from the entity table or the application table. The column name
+ * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+ * if there is no info associated with the event.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result HBase Result.
+ * @param prefix column prefix.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ public static <T> void readEvents(TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix) throws IOException {
+ Map<String, TimelineEvent> eventsMap = new HashMap<>();
+ Map<?, Object> eventsResult =
+ prefix.readResultsHavingCompoundColumnQualifiers(result);
+ for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+ byte[][] karr = (byte[][])eventResult.getKey();
+ // the column name is of the form "eventId=timestamp=infoKey"
+ if (karr.length == 3) {
+ String id = Bytes.toString(karr[0]);
+ long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
+ String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ event = new TimelineEvent();
+ event.setId(id);
+ event.setTimestamp(ts);
+ eventsMap.put(key, event);
+ }
+ // handle empty info
+ String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+ if (infoKey != null) {
+ event.addInfo(infoKey, eventResult.getValue());
+ }
+ } else {
+ LOG.warn("incorrectly formatted column name: it will be discarded");
+ continue;
+ }
+ }
+ Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+ entity.addEvents(eventsSet);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index f47ba93..775879a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
@@ -46,7 +49,8 @@ public enum EntityColumn implements Column<EntityTable> {
/**
* When the entity was created.
*/
- CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+ CREATED_TIME(EntityColumnFamily.INFO, "created_time",
+ LongConverter.getInstance()),
/**
* The version of the flow that this entity belongs to.
@@ -60,12 +64,17 @@ public enum EntityColumn implements Column<EntityTable> {
EntityColumn(ColumnFamily<EntityTable> columnFamily,
String columnQualifier) {
+ this(columnFamily, columnQualifier, GenericConverter.getInstance());
+ }
+
+ EntityColumn(ColumnFamily<EntityTable> columnFamily,
+ String columnQualifier, ValueConverter converter) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
// Future-proof by ensuring the right column prefix hygiene.
this.columnQualifierBytes =
Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
- this.column = new ColumnHelper<EntityTable>(columnFamily);
+ this.column = new ColumnHelper<EntityTable>(columnFamily, converter);
}
/**
@@ -108,6 +117,21 @@ public enum EntityColumn implements Column<EntityTable> {
return null;
}
+ @Override
+ public byte[] getColumnQualifierBytes() {
+ return columnQualifierBytes.clone();
+ }
+
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public ValueConverter getValueConverter() {
+ return column.getValueConverter();
+ }
+
/**
* Retrieve an {@link EntityColumn} given a name, or null if there is no
* match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index f3c7e7f..de2b29d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -56,7 +56,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
/**
* Lifecycle events for an entity.
*/
- EVENT(EntityColumnFamily.INFO, "e"),
+ EVENT(EntityColumnFamily.INFO, "e", true),
/**
* Config column stores configuration with config key as the column name.
@@ -78,6 +78,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
+ private final boolean compoundColQual;
/**
* Private constructor, meant to be used by the enum definition.
@@ -87,7 +88,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
String columnPrefix) {
- this(columnFamily, columnPrefix, GenericConverter.getInstance());
+ this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
+ }
+
+ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+ String columnPrefix, boolean compondColQual) {
+ this(columnFamily, columnPrefix, compondColQual,
+ GenericConverter.getInstance());
+ }
+
+ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+ String columnPrefix, ValueConverter converter) {
+ this(columnFamily, columnPrefix, false, converter);
}
/**
@@ -99,7 +111,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* this column prefix.
*/
EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
- String columnPrefix, ValueConverter converter) {
+ String columnPrefix, boolean compondColQual, ValueConverter converter) {
column = new ColumnHelper<EntityTable>(columnFamily, converter);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
@@ -110,6 +122,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
+ this.compoundColQual = compondColQual;
}
/**
@@ -131,6 +144,24 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
this.columnPrefixBytes, qualifierPrefix);
}
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public ValueConverter getValueConverter() {
+ return column.getValueConverter();
+ }
+
+ public byte[] getCompoundColQualBytes(String qualifier,
+ byte[]...components) {
+ if (!compoundColQual) {
+ return ColumnHelper.getColumnQualifier(null, qualifier);
+ }
+ return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+ }
+
/*
* (non-Javadoc)
*
@@ -287,5 +318,4 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
// Default to null
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index a5933da..188c2fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/**
* Identifies partially qualified columns for the {@link FlowActivityTable}.
@@ -50,6 +51,7 @@ public enum FlowActivityColumnPrefix
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
+ private final boolean compoundColQual;
private final AggregationOperation aggOp;
@@ -64,6 +66,12 @@ public enum FlowActivityColumnPrefix
private FlowActivityColumnPrefix(
ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
AggregationOperation aggOp) {
+ this(columnFamily, columnPrefix, aggOp, false);
+ }
+
+ private FlowActivityColumnPrefix(
+ ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+ AggregationOperation aggOp, boolean compoundColQual) {
column = new ColumnHelper<FlowActivityTable>(columnFamily);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
@@ -75,6 +83,7 @@ public enum FlowActivityColumnPrefix
.encode(columnPrefix));
}
this.aggOp = aggOp;
+ this.compoundColQual = compoundColQual;
}
/**
@@ -100,6 +109,16 @@ public enum FlowActivityColumnPrefix
return columnPrefixBytes.clone();
}
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public ValueConverter getValueConverter() {
+ return column.getValueConverter();
+ }
+
public AggregationOperation getAttribute() {
return aggOp;
}
@@ -251,4 +270,20 @@ public enum FlowActivityColumnPrefix
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
+
+ @Override
+ public byte[] getCompoundColQualBytes(String qualifier,
+ byte[]...components) {
+ if (!compoundColQual) {
+ return ColumnHelper.getColumnQualifier(null, qualifier);
+ }
+ return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+ }
+
+ @Override
+ public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+ throws IOException {
+ // There are no compound column qualifiers for flow activity table.
+ return null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index d50bb16..f1553b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -86,10 +86,12 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
return columnQualifier;
}
+ @Override
public byte[] getColumnQualifierBytes() {
return columnQualifierBytes.clone();
}
+ @Override
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
@@ -144,6 +146,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
return null;
}
+ @Override
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index fa94aae..77f2ab2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -52,6 +52,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
+ private final boolean compoundColQual;
private final AggregationOperation aggOp;
@@ -65,6 +66,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*/
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra, ValueConverter converter) {
+ this(columnFamily, columnPrefix, fra, converter, false);
+ }
+
+ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+ String columnPrefix, AggregationOperation fra, ValueConverter converter,
+ boolean compoundColQual) {
column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
@@ -76,6 +83,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
.encode(columnPrefix));
}
this.aggOp = fra;
+ this.compoundColQual = compoundColQual;
}
/**
@@ -101,6 +109,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
this.columnPrefixBytes, qualifierPrefix);
}
+ @Override
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
@@ -222,6 +231,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
return null;
}
+ @Override
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
@@ -257,4 +267,20 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
// Default to null
return null;
}
+
+ @Override
+ public byte[] getCompoundColQualBytes(String qualifier,
+ byte[]...components) {
+ if (!compoundColQual) {
+ return ColumnHelper.getColumnQualifier(null, qualifier);
+ }
+ return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
+ }
+
+ @Override
+ public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+ throws IOException {
+ // There are no compound column qualifiers for flow run table.
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6baea37..0ace529 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -200,6 +200,7 @@ class FlowScanner implements RegionScanner, Closeable {
int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
+
while (cellLimit <= 0 || addedCnt < cellLimit) {
cell = peekAtNextCell(cellLimit);
if (cell == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 0de09e0..53210f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
@@ -76,93 +77,231 @@ class ApplicationEntityReader extends GenericEntityReader {
return APPLICATION_TABLE;
}
+ /**
+ * This method is called only for multiple entity reads.
+ */
@Override
- protected FilterList constructFilterListBasedOnFields() {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
- TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
- // Fetch all the columns.
- if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (dataToRetrieve.getConfsToRetrieve() == null ||
- dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) &&
- (dataToRetrieve.getMetricsToRetrieve() == null ||
- dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
- return list;
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
+ // Filters here cannot be null for multiple entity reads as they are set in
+ // augmentParams if null.
+ TimelineEntityFilters filters = getFilters();
+ FilterList listBasedOnFilters = new FilterList();
+ // Create filter list based on created time range and add it to
+ // listBasedOnFilters.
+ long createdTimeBegin = filters.getCreatedTimeBegin();
+ long createdTimeEnd = filters.getCreatedTimeEnd();
+ if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createSingleColValueFiltersByRange(
+ ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
}
- FilterList infoColFamilyList = new FilterList();
- // By default fetch everything in INFO column family.
- FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
- infoColFamilyList.addFilter(infoColumnFamily);
+ // Create filter list based on metric filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList metricFilters = filters.getMetricFilters();
+ if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.METRIC, metricFilters));
+ }
+ // Create filter list based on config filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList configFilters = filters.getConfigFilters();
+ if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.CONFIG, configFilters));
+ }
+ // Create filter list based on info filters and add it to listBasedOnFilters
+ TimelineFilterList infoFilters = filters.getInfoFilters();
+ if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.INFO, infoFilters));
+ }
+ return listBasedOnFilters;
+ }
+
+ /**
+ * Add {@link QualifierFilter} filters to filter list for each column of
+ * application table.
+ *
+ * @param list filter list to which qualifier filters have to be added.
+ */
+ @Override
+ protected void updateFixedColumns(FilterList list) {
+ for (ApplicationColumn column : ApplicationColumn.values()) {
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(column.getColumnQualifierBytes())));
+ }
+ }
+
+ /**
+ * Creates a filter list which indicates that only some of the column
+ * qualifiers in the info column family will be returned in result.
+ *
+ * @return filter list.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ private FilterList createFilterListForColsOfInfoFamily()
+ throws IOException {
+ FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+ // Add filters for each column in entity table.
+ updateFixedColumns(infoFamilyColsFilter);
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+ // If INFO field has to be retrieved, add a filter for fetching columns
+ // with INFO column prefix.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
+ }
+ TimelineFilterList relatesTo = getFilters().getRelatesTo();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ // If RELATES_TO field has to be retrieved, add a filter for fetching
+ // columns with RELATES_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO));
+ } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain RELATES_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // relatesTo filters are specified. relatesTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> relatesToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.RELATES_TO, relatesToCols));
+ }
+ TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+ // columns with IS_RELATED_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
+ } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // isRelatedTo filters are specified. isRelatedTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> isRelatedToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+ }
+ TimelineFilterList eventFilters = getFilters().getEventFilters();
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ // If EVENTS field has to be retrieved, add a filter for fetching columns
+ // with EVENT column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.EQUAL, ApplicationColumnPrefix.EVENT));
+ } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
+ // Even if fields to retrieve does not contain EVENTS, we still need to
+ // have a filter to fetch some of the column qualifiers on the basis of
+ // event filters specified. Event filters will then be matched after
+ // fetching rows from HBase.
+ Set<String> eventCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.EVENT, eventCols));
+ }
+ return infoFamilyColsFilter;
+ }
+
+ /**
+ * Exclude column prefixes via filters which are not required(based on fields
+ * to retrieve) from info column family. These filters are added to filter
+ * list which contains a filter for getting info column family.
+ *
+ * @param infoColFamilyList filter list for info column family.
+ */
+ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// Events not required.
- TimelineEntityFilters filters = getFilters();
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getEventFilters() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
}
// info not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getInfoFilters() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
}
- // is releated to not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getIsRelatedTo() == null)) {
+ // is related to not required.
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
}
// relates to not required.
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
- (isSingleEntityRead() || filters.getRelatesTo() == null)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ TimelineFilterUtils.createHBaseQualifierFilter(
+ CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
}
- list.addFilter(infoColFamilyList);
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) ||
- (!isSingleEntityRead() && filters.getConfigFilters() != null)) ||
- (dataToRetrieve.getConfsToRetrieve() != null &&
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) {
- FilterList filterCfg =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
- if (dataToRetrieve.getConfsToRetrieve() != null &&
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) {
- filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
- ApplicationColumnPrefix.CONFIG,
- dataToRetrieve.getConfsToRetrieve()));
- }
- list.addFilter(filterCfg);
+ }
+
+ /**
+ * Updates filter list based on fields for confs and metrics to retrieve.
+ *
+ * @param listBasedOnFields filter list based on fields.
+ * @throws IOException if any problem occurs while updating filter list.
+ */
+ private void updateFilterForConfsAndMetricsToRetrieve(
+ FilterList listBasedOnFields) throws IOException {
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+ // Please note that if confsToRetrieve is specified, we would have added
+ // CONFS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+ // Create a filter list for configs.
+ listBasedOnFields.addFilter(TimelineFilterUtils.
+ createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getConfsToRetrieve(),
+ ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG));
}
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) ||
- (!isSingleEntityRead() && filters.getMetricFilters() != null)) ||
- (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
- FilterList filterMetrics =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
- if (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
- filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
- ApplicationColumnPrefix.METRIC,
- dataToRetrieve.getMetricsToRetrieve()));
- }
- list.addFilter(filterMetrics);
+
+ // Please note that if metricsToRetrieve is specified, we would have added
+ // METRICS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+ // Create a filter list for metrics.
+ listBasedOnFields.addFilter(TimelineFilterUtils.
+ createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getMetricsToRetrieve(),
+ ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC));
+ }
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFields() throws IOException {
+ if (!needCreateFilterListBasedOnFields()) {
+ // Fetch all the columns. No need of a filter.
+ return null;
+ }
+ FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+ // We can fetch only some of the columns from info family.
+ infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+ } else {
+ // Exclude column prefixes in info column family which are not required
+ // based on fields to retrieve.
+ excludeFieldsFromInfoColFamily(infoColFamilyList);
}
- return list;
+ listBasedOnFields.addFilter(infoColFamilyList);
+
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ return listBasedOnFields;
}
@Override
@@ -182,6 +321,9 @@ class ApplicationEntityReader extends GenericEntityReader {
@Override
protected void validateParams() {
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+ Preconditions.checkNotNull(
+ getDataToRetrieve(), "data to retrieve shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getEntityType(),
@@ -202,6 +344,7 @@ class ApplicationEntityReader extends GenericEntityReader {
throws IOException {
TimelineReaderContext context = getContext();
if (isSingleEntityRead()) {
+ // Get flow context information from AppToFlow table.
if (context.getFlowName() == null || context.getFlowRunId() == null ||
context.getUserId() == null) {
FlowContext flowContext = lookupFlowContext(
@@ -211,7 +354,12 @@ class ApplicationEntityReader extends GenericEntityReader {
context.setUserId(flowContext.getUserId());
}
}
+ // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+ // metricsToRetrieve are specified.
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+ if (!isSingleEntityRead()) {
+ createFiltersIfNull();
+ }
}
@Override
@@ -252,114 +400,84 @@ class ApplicationEntityReader extends GenericEntityReader {
Number createdTime =
(Number)ApplicationColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
- if (!isSingleEntityRead() &&
- (entity.getCreatedTime() < filters.getCreatedTimeBegin() ||
- entity.getCreatedTime() > filters.getCreatedTimeEnd())) {
- return null;
- }
+
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
- // fetch is related to entities
+ // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // isRelatedTo are not set in HBase scan.
boolean checkIsRelatedTo =
- filters != null && filters.getIsRelatedTo() != null &&
- filters.getIsRelatedTo().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
- readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
- true);
- if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
- entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) {
+ !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
+ filters.getIsRelatedTo().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
+ checkIsRelatedTo) {
+ TimelineStorageUtils.readRelationship(
+ entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true);
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+ filters.getIsRelatedTo())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+ Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
- // fetch relates to entities
+ // fetch relates to entities and match relatesTo filter. If relatesTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // relatesTo are not set in HBase scan.
boolean checkRelatesTo =
- filters != null && filters.getRelatesTo() != null &&
- filters.getRelatesTo().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
- readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
- false);
- if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
- entity.getRelatesToEntities(), filters.getRelatesTo())) {
+ !isSingleEntityRead() && filters.getRelatesTo() != null &&
+ filters.getRelatesTo().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+ checkRelatesTo) {
+ TimelineStorageUtils.readRelationship(
+ entity, result, ApplicationColumnPrefix.RELATES_TO, false);
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+ filters.getRelatesTo())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
- // fetch info
- boolean checkInfo = filters != null && filters.getInfoFilters() != null &&
- filters.getInfoFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
- if (checkInfo &&
- !TimelineStorageUtils.matchFilters(
- entity.getInfo(), filters.getInfoFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.INFO)) {
- entity.getInfo().clear();
- }
+ // fetch info if fieldsToRetrieve contains INFO or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ TimelineStorageUtils.readKeyValuePairs(
+ entity, result, ApplicationColumnPrefix.INFO, false);
}
- // fetch configs
- boolean checkConfigs =
- filters != null && filters.getConfigFilters() != null &&
- filters.getConfigFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineStorageUtils.matchFilters(
- entity.getConfigs(), filters.getConfigFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
- entity.getConfigs().clear();
- }
+ // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
+ TimelineStorageUtils.readKeyValuePairs(
+ entity, result, ApplicationColumnPrefix.CONFIG, true);
}
- // fetch events
+ // fetch events and match event filters if they exist. If event filters do
+ // not match, entity would be dropped. We have to match filters locally
+ // as relevant HBase filters to filter out rows on the basis of events
+ // are not set in HBase scan.
boolean checkEvents =
- filters != null && filters.getEventFilters() != null &&
- filters.getEventFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
- readEvents(entity, result, true);
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(
- entity.getEvents(), filters.getEventFilters())) {
+ !isSingleEntityRead() && filters.getEventFilters() != null &&
+ filters.getEventFilters().getFilterList().size() > 0;
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
+ checkEvents) {
+ TimelineStorageUtils.readEvents(
+ entity, result, ApplicationColumnPrefix.EVENT);
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+ filters.getEventFilters())) {
return null;
}
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.EVENTS)) {
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
entity.getEvents().clear();
}
}
- // fetch metrics
- boolean checkMetrics =
- filters != null && filters.getMetricFilters() != null &&
- filters.getMetricFilters().size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+ // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
- if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
- entity.getMetrics(), filters.getMetricFilters())) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.METRICS)) {
- entity.getMetrics().clear();
- }
}
return entity;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index 0d2bdd8..d8ca038 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
@@ -75,6 +76,12 @@ class FlowActivityEntityReader extends TimelineEntityReader {
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
+ createFiltersIfNull();
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/366eb54e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index 743315c..b2de2e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -38,9 +38,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
@@ -76,6 +78,9 @@ class FlowRunEntityReader extends TimelineEntityReader {
@Override
protected void validateParams() {
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+ Preconditions.checkNotNull(
+ getDataToRetrieve(), "data to retrieve shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getUserId(),
@@ -90,37 +95,87 @@ class FlowRunEntityReader extends TimelineEntityReader {
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn) {
+ // Add metrics to fields to retrieve if metricsToRetrieve is specified.
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+ if (!isSingleEntityRead()) {
+ createFiltersIfNull();
+ }
+ }
+
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
+ FilterList listBasedOnFilters = new FilterList();
+ // Filter based on created time range.
+ Long createdTimeBegin = getFilters().getCreatedTimeBegin();
+ Long createdTimeEnd = getFilters().getCreatedTimeEnd();
+ if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createSingleColValueFiltersByRange(
+ FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd));
+ }
+ // Filter based on metric filters.
+ TimelineFilterList metricFilters = getFilters().getMetricFilters();
+ if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricFilters));
+ }
+ return listBasedOnFilters;
+ }
+
+ /**
+ * Add {@link QualifierFilter} filters to filter list for each column of
+ * flow run table.
+ *
+ * @return filter list to which qualifier filters have been added.
+ */
+ private FilterList updateFixedColumns() {
+ FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE);
+ for (FlowRunColumn column : FlowRunColumn.values()) {
+ columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(column.getColumnQualifierBytes())));
+ }
+ return columnsList;
}
@Override
- protected FilterList constructFilterListBasedOnFields() {
+ protected FilterList constructFilterListBasedOnFields() throws IOException {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
- // Metrics not required.
- if (!isSingleEntityRead() &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) &&
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)) {
+ // If multiple entities have to be retrieved, check if metrics have to be
+ // retrieved and if not, add a filter so that metrics can be excluded.
+ // Metrics are always returned if we are reading a single entity.
+ if (!isSingleEntityRead() && !TimelineStorageUtils.hasField(
+ dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
infoColFamilyList.addFilter(infoColumnFamily);
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
- list.addFilter(infoColFamilyList);
- }
- if (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
- FilterList infoColFamilyList = new FilterList();
- infoColFamilyList.addFilter(infoColumnFamily);
- infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
- FlowRunColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve()));
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
list.addFilter(infoColFamilyList);
+ } else {
+ // Check if metricsToRetrieve are specified and if they are, create a
+ // filter list for info column family by adding flow run tables columns
+ // and a list for metrics to retrieve. Pls note that fieldsToRetrieve
+ // will have METRICS added to it if metricsToRetrieve are specified
+ // (in augmentParams()).
+ TimelineFilterList metricsToRetrieve =
+ dataToRetrieve.getMetricsToRetrieve();
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ FilterList infoColFamilyList = new FilterList();
+ infoColFamilyList.addFilter(infoColumnFamily);
+ FilterList columnsList = updateFixedColumns();
+ columnsList.addFilter(
+ TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+ infoColFamilyList.addFilter(columnsList);
+ list.addFilter(infoColFamilyList);
+ }
}
return list;
}
@@ -175,11 +230,6 @@ class FlowRunEntityReader extends TimelineEntityReader {
if (startTime != null) {
flowRun.setStartTime(startTime.longValue());
}
- if (!isSingleEntityRead() &&
- (flowRun.getStartTime() < getFilters().getCreatedTimeBegin() ||
- flowRun.getStartTime() > getFilters().getCreatedTimeEnd())) {
- return null;
- }
// read the end time if available
Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
@@ -193,9 +243,10 @@ class FlowRunEntityReader extends TimelineEntityReader {
flowRun.setVersion(version);
}
- // read metrics
- if (isSingleEntityRead() ||
- getDataToRetrieve().getFieldsToRetrieve().contains(Field.METRICS)) {
+ // read metrics if its a single entity query or if METRICS are part of
+ // fieldsToRetrieve.
+ if (isSingleEntityRead() || TimelineStorageUtils.hasField(
+ getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) {
readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org