You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/06/10 15:48:44 UTC
[2/4] hadoop git commit: YARN-5170. Eliminate singleton converters
and static method access. (Joep Rottinghuis via Varun Saxena)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 8e806bc..aa2bfda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -46,8 +46,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import com.google.common.base.Preconditions;
@@ -150,13 +153,13 @@ class ApplicationEntityReader extends GenericEntityReader {
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// If INFO field has to be retrieved, add a filter for fetching columns
// with INFO column prefix.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
infoFamilyColsFilter.addFilter(
TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
}
TimelineFilterList relatesTo = getFilters().getRelatesTo();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
// If RELATES_TO field has to be retrieved, add a filter for fetching
// columns with RELATES_TO column prefix.
infoFamilyColsFilter.addFilter(
@@ -169,12 +172,11 @@ class ApplicationEntityReader extends GenericEntityReader {
// matched after fetching rows from HBase.
Set<String> relatesToCols =
TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- ApplicationColumnPrefix.RELATES_TO, relatesToCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.RELATES_TO, relatesToCols));
}
TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
// If IS_RELATED_TO field has to be retrieved, add a filter for fetching
// columns with IS_RELATED_TO column prefix.
infoFamilyColsFilter.addFilter(
@@ -187,12 +189,11 @@ class ApplicationEntityReader extends GenericEntityReader {
// matched after fetching rows from HBase.
Set<String> isRelatedToCols =
TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
}
TimelineFilterList eventFilters = getFilters().getEventFilters();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ if (hasField(fieldsToRetrieve, Field.EVENTS)) {
// If EVENTS field has to be retrieved, add a filter for fetching columns
// with EVENT column prefix.
infoFamilyColsFilter.addFilter(
@@ -205,9 +206,8 @@ class ApplicationEntityReader extends GenericEntityReader {
// fetching rows from HBase.
Set<String> eventCols =
TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- ApplicationColumnPrefix.EVENT, eventCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ ApplicationColumnPrefix.EVENT, eventCols));
}
return infoFamilyColsFilter;
}
@@ -222,25 +222,25 @@ class ApplicationEntityReader extends GenericEntityReader {
private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// Events not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
infoColFamilyList.addFilter(
TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
}
// info not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+ if (!hasField(fieldsToRetrieve, Field.INFO)) {
infoColFamilyList.addFilter(
TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
}
// is related to not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
infoColFamilyList.addFilter(
TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
}
// relates to not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
infoColFamilyList.addFilter(
TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
@@ -308,9 +308,10 @@ class ApplicationEntityReader extends GenericEntityReader {
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
TimelineReaderContext context = getContext();
- byte[] rowKey =
- ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(),
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId(), context.getAppId());
+ byte[] rowKey = applicationRowKey.getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -345,10 +346,13 @@ class ApplicationEntityReader extends GenericEntityReader {
TimelineReaderContext context = getContext();
if (isSingleEntityRead()) {
// Get flow context information from AppToFlow table.
- if (context.getFlowName() == null || context.getFlowRunId() == null ||
- context.getUserId() == null) {
- FlowContext flowContext = lookupFlowContext(
- context.getClusterId(), context.getAppId(), hbaseConf, conn);
+ if (context.getFlowName() == null || context.getFlowRunId() == null
+ || context.getUserId() == null) {
+ AppToFlowRowKey appToFlowRowKey =
+ new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+ FlowContext flowContext =
+ lookupFlowContext(appToFlowRowKey,
+ hbaseConf, conn);
context.setFlowName(flowContext.getFlowName());
context.setFlowRunId(flowContext.getFlowRunId());
context.setUserId(flowContext.getUserId());
@@ -367,15 +371,13 @@ class ApplicationEntityReader extends GenericEntityReader {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
TimelineReaderContext context = getContext();
- if (context.getFlowRunId() != null) {
- scan.setRowPrefixFilter(ApplicationRowKey.
- getRowKeyPrefix(context.getClusterId(), context.getUserId(),
- context.getFlowName(), context.getFlowRunId()));
- } else {
- scan.setRowPrefixFilter(ApplicationRowKey.
- getRowKeyPrefix(context.getClusterId(), context.getUserId(),
- context.getFlowName()));
- }
+ // Whether or not flowRunID is null doesn't matter, the
+ // ApplicationRowKeyPrefix will do the right thing.
+ RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix =
+ new ApplicationRowKeyPrefix(context.getClusterId(),
+ context.getUserId(), context.getFlowName(),
+ context.getFlowRunId());
+ scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(getFilters().getLimit()));
if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -409,15 +411,14 @@ class ApplicationEntityReader extends GenericEntityReader {
boolean checkIsRelatedTo =
!isSingleEntityRead() && filters.getIsRelatedTo() != null &&
filters.getIsRelatedTo().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
- checkIsRelatedTo) {
- TimelineStorageUtils.readRelationship(
- entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true);
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+ true);
if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
filters.getIsRelatedTo())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+ if (!hasField(fieldsToRetrieve,
Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
@@ -430,29 +431,27 @@ class ApplicationEntityReader extends GenericEntityReader {
boolean checkRelatesTo =
!isSingleEntityRead() && filters.getRelatesTo() != null &&
filters.getRelatesTo().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO) ||
checkRelatesTo) {
- TimelineStorageUtils.readRelationship(
- entity, result, ApplicationColumnPrefix.RELATES_TO, false);
+ readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+ false);
if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
filters.getRelatesTo())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info if fieldsToRetrieve contains INFO or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
- TimelineStorageUtils.readKeyValuePairs(
- entity, result, ApplicationColumnPrefix.INFO, false);
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
+ readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
}
// fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
- TimelineStorageUtils.readKeyValuePairs(
- entity, result, ApplicationColumnPrefix.CONFIG, true);
+ if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+ readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
}
// fetch events and match event filters if they exist. If event filters do
@@ -462,21 +461,19 @@ class ApplicationEntityReader extends GenericEntityReader {
boolean checkEvents =
!isSingleEntityRead() && filters.getEventFilters() != null &&
filters.getEventFilters().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
- checkEvents) {
- TimelineStorageUtils.readEvents(
- entity, result, ApplicationColumnPrefix.EVENT);
+ if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+ readEvents(entity, result, ApplicationColumnPrefix.EVENT);
if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
filters.getEventFilters())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics if fieldsToRetrieve contains METRICS or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
+ if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
}
return entity;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index faecd14..9ba5e38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -35,9 +35,11 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import com.google.common.base.Preconditions;
@@ -50,6 +52,12 @@ class FlowActivityEntityReader extends TimelineEntityReader {
private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
new FlowActivityTable();
+ /**
+ * Used to convert Long key components to and from storage format.
+ */
+ private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+
public FlowActivityEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
@@ -105,15 +113,14 @@ class FlowActivityEntityReader extends TimelineEntityReader {
if (getFilters().getCreatedTimeBegin() == 0L &&
getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
// All records have to be chosen.
- scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+ scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
+ .getRowKeyPrefix());
} else {
- scan.setStartRow(
- FlowActivityRowKey.getRowKeyPrefix(clusterId,
- getFilters().getCreatedTimeEnd()));
- scan.setStopRow(
- FlowActivityRowKey.getRowKeyPrefix(clusterId,
- (getFilters().getCreatedTimeBegin() <= 0 ? 0 :
- (getFilters().getCreatedTimeBegin() - 1))));
+ scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
+ .getCreatedTimeEnd()).getRowKeyPrefix());
+ scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters()
+ .getCreatedTimeBegin() <= 0 ? 0
+ : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix());
}
// use the page filter to limit the result to the page size
// the scanner may still return more than the limit; therefore we need to
@@ -137,8 +144,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
// get the list of run ids along with the version that are associated with
// this flow on this day
Map<Long, Object> runIdsMap =
- FlowActivityColumnPrefix.RUN_ID.readResults(result,
- LongKeyConverter.getInstance());
+ FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter);
for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
Long runId = e.getKey();
String version = (String)e.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index e1695ef..986a28f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
@@ -43,11 +43,12 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.webapp.BadRequestException;
@@ -81,8 +82,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
@Override
protected void validateParams() {
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
- Preconditions.checkNotNull(
- getDataToRetrieve(), "data to retrieve shouldn't be null");
+ Preconditions.checkNotNull(getDataToRetrieve(),
+ "data to retrieve shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getUserId(),
@@ -97,8 +98,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
if (!isSingleEntityRead() && fieldsToRetrieve != null) {
for (Field field : fieldsToRetrieve) {
if (field != Field.ALL && field != Field.METRICS) {
- throw new BadRequestException("Invalid field " + field +
- " specified while querying flow runs.");
+ throw new BadRequestException("Invalid field " + field
+ + " specified while querying flow runs.");
}
}
}
@@ -119,23 +120,22 @@ class FlowRunEntityReader extends TimelineEntityReader {
Long createdTimeBegin = getFilters().getCreatedTimeBegin();
Long createdTimeEnd = getFilters().getCreatedTimeEnd();
if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createSingleColValueFiltersByRange(
- FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd));
+ listBasedOnFilters.addFilter(TimelineFilterUtils
+ .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME,
+ createdTimeBegin, createdTimeEnd));
}
// Filter based on metric filters.
TimelineFilterList metricFilters = getFilters().getMetricFilters();
if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createHBaseFilterList(
- FlowRunColumnPrefix.METRIC, metricFilters));
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricFilters));
}
return listBasedOnFilters;
}
/**
- * Add {@link QualifierFilter} filters to filter list for each column of
- * flow run table.
+ * Add {@link QualifierFilter} filters to filter list for each column of flow
+ * run table.
*
* @return filter list to which qualifier filters have been added.
*/
@@ -153,20 +153,19 @@ class FlowRunEntityReader extends TimelineEntityReader {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
+ new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
+ FlowRunColumnFamily.INFO.getBytes()));
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// If multiple entities have to be retrieved, check if metrics have to be
// retrieved and if not, add a filter so that metrics can be excluded.
// Metrics are always returned if we are reading a single entity.
- if (!isSingleEntityRead() && !TimelineStorageUtils.hasField(
- dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
+ if (!isSingleEntityRead()
+ && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
infoColFamilyList.addFilter(infoColumnFamily);
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
+ infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC
+ .getColumnPrefixBytes(""))));
list.addFilter(infoColFamilyList);
} else {
// Check if metricsToRetrieve are specified and if they are, create a
@@ -176,14 +175,13 @@ class FlowRunEntityReader extends TimelineEntityReader {
// (in augmentParams()).
TimelineFilterList metricsToRetrieve =
dataToRetrieve.getMetricsToRetrieve();
- if (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
+ if (metricsToRetrieve != null
+ && !metricsToRetrieve.getFilterList().isEmpty()) {
FilterList infoColFamilyList = new FilterList();
infoColFamilyList.addFilter(infoColumnFamily);
FilterList columnsList = updateFixedColumns();
- columnsList.addFilter(
- TimelineFilterUtils.createHBaseFilterList(
- FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+ columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricsToRetrieve));
infoColFamilyList.addFilter(columnsList);
list.addFilter(infoColFamilyList);
}
@@ -195,9 +193,10 @@ class FlowRunEntityReader extends TimelineEntityReader {
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
TimelineReaderContext context = getContext();
- byte[] rowKey =
- FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(),
+ FlowRunRowKey flowRunRowKey =
+ new FlowRunRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId());
+ byte[] rowKey = flowRunRowKey.getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -207,13 +206,14 @@ class FlowRunEntityReader extends TimelineEntityReader {
}
@Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
+ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
Scan scan = new Scan();
TimelineReaderContext context = getContext();
- scan.setRowPrefixFilter(
- FlowRunRowKey.getRowKeyPrefix(context.getClusterId(),
- context.getUserId(), context.getFlowName()));
+ RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix =
+ new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(),
+ context.getFlowName());
+ scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(getFilters().getLimit()));
if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -238,27 +238,27 @@ class FlowRunEntityReader extends TimelineEntityReader {
}
// read the start time
- Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+ Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result);
if (startTime != null) {
flowRun.setStartTime(startTime.longValue());
}
// read the end time if available
- Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+ Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result);
if (endTime != null) {
flowRun.setMaxEndTime(endTime.longValue());
}
// read the flow version
- String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+ String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result);
if (version != null) {
flowRun.setVersion(version);
}
// read metrics if its a single entity query or if METRICS are part of
// fieldsToRetrieve.
- if (isSingleEntityRead() || TimelineStorageUtils.hasField(
- getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) {
+ if (isSingleEntityRead()
+ || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) {
readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
index 22583b5..4e1ab8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -28,11 +29,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
@@ -44,11 +45,16 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -66,6 +72,12 @@ class GenericEntityReader extends TimelineEntityReader {
*/
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+ /**
+ * Used to convert strings key components to and from storage format.
+ */
+ private final KeyConverter<String> stringKeyConverter =
+ new StringKeyConverter();
+
public GenericEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
boolean sortedKeys) {
@@ -95,32 +107,29 @@ class GenericEntityReader extends TimelineEntityReader {
long createdTimeBegin = filters.getCreatedTimeBegin();
long createdTimeEnd = filters.getCreatedTimeEnd();
if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createSingleColValueFiltersByRange(
- EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
+ listBasedOnFilters.addFilter(TimelineFilterUtils
+ .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
+ createdTimeBegin, createdTimeEnd));
}
// Create filter list based on metric filters and add it to
// listBasedOnFilters.
TimelineFilterList metricFilters = filters.getMetricFilters();
if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.METRIC, metricFilters));
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.METRIC, metricFilters));
}
// Create filter list based on config filters and add it to
// listBasedOnFilters.
TimelineFilterList configFilters = filters.getConfigFilters();
if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.CONFIG, configFilters));
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.CONFIG, configFilters));
}
// Create filter list based on info filters and add it to listBasedOnFilters
TimelineFilterList infoFilters = filters.getInfoFilters();
if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
- listBasedOnFilters.addFilter(
- TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.INFO, infoFilters));
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.INFO, infoFilters));
}
return listBasedOnFilters;
}
@@ -130,10 +139,10 @@ class GenericEntityReader extends TimelineEntityReader {
*
* @return true if we need to fetch some of the columns, false otherwise.
*/
- private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
+ private boolean fetchPartialEventCols(TimelineFilterList eventFilters,
EnumSet<Field> fieldsToRetrieve) {
return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
+ !hasField(fieldsToRetrieve, Field.EVENTS));
}
/**
@@ -141,10 +150,10 @@ class GenericEntityReader extends TimelineEntityReader {
*
* @return true if we need to fetch some of the columns, false otherwise.
*/
- private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
+ private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
EnumSet<Field> fieldsToRetrieve) {
return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
+ !hasField(fieldsToRetrieve, Field.RELATES_TO));
}
/**
@@ -152,10 +161,10 @@ class GenericEntityReader extends TimelineEntityReader {
*
* @return true if we need to fetch some of the columns, false otherwise.
*/
- private static boolean fetchPartialIsRelatedToCols(
- TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
+ private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
+ EnumSet<Field> fieldsToRetrieve) {
return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
+ !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
}
/**
@@ -163,19 +172,20 @@ class GenericEntityReader extends TimelineEntityReader {
* relatesto and isrelatedto from info family.
*
* @return true, if we need to fetch only some of the columns, false if we
- * need to fetch all the columns under info column family.
+ * need to fetch all the columns under info column family.
*/
protected boolean fetchPartialColsFromInfoFamily() {
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
TimelineEntityFilters filters = getFilters();
- return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) ||
- fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
- fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve);
+ return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
+ || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
+ || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
+ fieldsToRetrieve);
}
/**
- * Check if we need to create filter list based on fields. We need to create
- * a filter list iff all fields need not be retrieved or we have some specific
+ * Check if we need to create filter list based on fields. We need to create a
+ * filter list iff all fields need not be retrieved or we have some specific
* fields or metrics to retrieve. We also need to create a filter list if we
* have relationships(relatesTo/isRelatedTo) and event filters specified for
* the query.
@@ -188,22 +198,24 @@ class GenericEntityReader extends TimelineEntityReader {
// be retrieved, also check if we have some metrics or configs to
// retrieve specified for the query because then a filter list will have
// to be created.
- boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
- (dataToRetrieve.getConfsToRetrieve() != null &&
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
- (dataToRetrieve.getMetricsToRetrieve() != null &&
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
+ boolean flag =
+ !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
+ || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
+ .getConfsToRetrieve().getFilterList().isEmpty())
+ || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve
+ .getMetricsToRetrieve().getFilterList().isEmpty());
// Filters need to be checked only if we are reading multiple entities. If
// condition above is false, we check if there are relationships(relatesTo/
// isRelatedTo) and event filters specified for the query.
if (!flag && !isSingleEntityRead()) {
TimelineEntityFilters filters = getFilters();
- flag = (filters.getEventFilters() != null &&
- !filters.getEventFilters().getFilterList().isEmpty()) ||
- (filters.getIsRelatedTo() != null &&
- !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
- (filters.getRelatesTo() != null &&
- !filters.getRelatesTo().getFilterList().isEmpty());
+ flag =
+ (filters.getEventFilters() != null && !filters.getEventFilters()
+ .getFilterList().isEmpty())
+ || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
+ .getFilterList().isEmpty())
+ || (filters.getRelatesTo() != null && !filters.getRelatesTo()
+ .getFilterList().isEmpty());
}
return flag;
}
@@ -216,8 +228,8 @@ class GenericEntityReader extends TimelineEntityReader {
*/
protected void updateFixedColumns(FilterList list) {
for (EntityColumn column : EntityColumn.values()) {
- list.addFilter(new QualifierFilter(CompareOp.EQUAL,
- new BinaryComparator(column.getColumnQualifierBytes())));
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+ column.getColumnQualifierBytes())));
}
}
@@ -226,30 +238,29 @@ class GenericEntityReader extends TimelineEntityReader {
* qualifiers in the info column family will be returned in result.
*
* @param isApplication If true, it means operations are to be performed for
- * application table, otherwise for entity table.
+ * application table, otherwise for entity table.
* @return filter list.
* @throws IOException if any problem occurs while creating filter list.
*/
- private FilterList createFilterListForColsOfInfoFamily()
- throws IOException {
+ private FilterList createFilterListForColsOfInfoFamily() throws IOException {
FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
// Add filters for each column in entity table.
updateFixedColumns(infoFamilyColsFilter);
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// If INFO field has to be retrieved, add a filter for fetching columns
// with INFO column prefix.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
+ infoFamilyColsFilter
+ .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.EQUAL, EntityColumnPrefix.INFO));
}
TimelineFilterList relatesTo = getFilters().getRelatesTo();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
// If RELATES_TO field has to be retrieved, add a filter for fetching
// columns with RELATES_TO column prefix.
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
+ infoFamilyColsFilter.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.EQUAL,
+ EntityColumnPrefix.RELATES_TO));
} else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
// Even if fields to retrieve does not contain RELATES_TO, we still
// need to have a filter to fetch some of the column qualifiers if
@@ -257,17 +268,16 @@ class GenericEntityReader extends TimelineEntityReader {
// matched after fetching rows from HBase.
Set<String> relatesToCols =
TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- EntityColumnPrefix.RELATES_TO, relatesToCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.RELATES_TO, relatesToCols));
}
TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
// If IS_RELATED_TO field has to be retrieved, add a filter for fetching
// columns with IS_RELATED_TO column prefix.
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+ infoFamilyColsFilter.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.EQUAL,
+ EntityColumnPrefix.IS_RELATED_TO));
} else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
// Even if fields to retrieve does not contain IS_RELATED_TO, we still
// need to have a filter to fetch some of the column qualifiers if
@@ -275,27 +285,26 @@ class GenericEntityReader extends TimelineEntityReader {
// matched after fetching rows from HBase.
Set<String> isRelatedToCols =
TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
}
TimelineFilterList eventFilters = getFilters().getEventFilters();
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ if (hasField(fieldsToRetrieve, Field.EVENTS)) {
// If EVENTS field has to be retrieved, add a filter for fetching columns
// with EVENT column prefix.
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
+ infoFamilyColsFilter
+ .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
CompareOp.EQUAL, EntityColumnPrefix.EVENT));
- } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
+ } else if (eventFilters != null &&
+ !eventFilters.getFilterList().isEmpty()) {
// Even if fields to retrieve does not contain EVENTS, we still need to
// have a filter to fetch some of the column qualifiers on the basis of
// event filters specified. Event filters will then be matched after
// fetching rows from HBase.
Set<String> eventCols =
TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
- infoFamilyColsFilter.addFilter(
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
- EntityColumnPrefix.EVENT, eventCols));
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ EntityColumnPrefix.EVENT, eventCols));
}
return infoFamilyColsFilter;
}
@@ -310,28 +319,28 @@ class GenericEntityReader extends TimelineEntityReader {
private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
// Events not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
- infoColFamilyList.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+ infoColFamilyList.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ EntityColumnPrefix.EVENT));
}
// info not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
- infoColFamilyList.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
+ if (!hasField(fieldsToRetrieve, Field.INFO)) {
+ infoColFamilyList.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ EntityColumnPrefix.INFO));
}
// is related to not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
- infoColFamilyList.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ infoColFamilyList.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ EntityColumnPrefix.IS_RELATED_TO));
}
// relates to not required.
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
- infoColFamilyList.addFilter(
- TimelineFilterUtils.createHBaseQualifierFilter(
- CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ infoColFamilyList.addFilter(TimelineFilterUtils
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ EntityColumnPrefix.RELATES_TO));
}
}
@@ -348,18 +357,18 @@ class GenericEntityReader extends TimelineEntityReader {
// CONFS to fields to retrieve in augmentParams() even if not specified.
if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
// Create a filter list for configs.
- listBasedOnFields.addFilter(TimelineFilterUtils.
- createFilterForConfsOrMetricsToRetrieve(
- dataToRetrieve.getConfsToRetrieve(),
- EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
+ listBasedOnFields.addFilter(TimelineFilterUtils
+ .createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
+ EntityColumnPrefix.CONFIG));
}
// Please note that if metricsToRetrieve is specified, we would have added
// METRICS to fields to retrieve in augmentParams() even if not specified.
if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
// Create a filter list for metrics.
- listBasedOnFields.addFilter(TimelineFilterUtils.
- createFilterForConfsOrMetricsToRetrieve(
+ listBasedOnFields.addFilter(TimelineFilterUtils
+ .createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(),
EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
}
@@ -375,8 +384,8 @@ class GenericEntityReader extends TimelineEntityReader {
FilterList infoColFamilyList = new FilterList();
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+ new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
+ EntityColumnFamily.INFO.getBytes()));
infoColFamilyList.addFilter(infoColumnFamily);
if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
// We can fetch only some of the columns from info family.
@@ -394,27 +403,27 @@ class GenericEntityReader extends TimelineEntityReader {
/**
* Looks up flow context from AppToFlow table.
*
- * @param clusterId Cluster Id.
- * @param appId App Id.
+ * @param appToFlowRowKey to identify Cluster and App Ids.
* @param hbaseConf HBase configuration.
* @param conn HBase Connection.
* @return flow context information.
* @throws IOException if any problem occurs while fetching flow information.
*/
- protected FlowContext lookupFlowContext(String clusterId, String appId,
+ protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
Configuration hbaseConf, Connection conn) throws IOException {
- byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+ byte[] rowKey = appToFlowRowKey.getRowKey();
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
- return new FlowContext(
- AppToFlowColumn.USER_ID.readResult(result).toString(),
- AppToFlowColumn.FLOW_ID.readResult(result).toString(),
- ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+ return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
+ .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+ ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
+ .longValue());
} else {
throw new NotFoundException(
- "Unable to find the context flow ID and flow run ID for clusterId=" +
- clusterId + ", appId=" + appId);
+ "Unable to find the context flow ID and flow run ID for clusterId="
+ + appToFlowRowKey.getClusterId() + ", appId="
+ + appToFlowRowKey.getAppId());
}
}
@@ -425,17 +434,21 @@ class GenericEntityReader extends TimelineEntityReader {
private final String userId;
private final String flowName;
private final Long flowRunId;
+
public FlowContext(String user, String flowName, Long flowRunId) {
this.userId = user;
this.flowName = flowName;
this.flowRunId = flowRunId;
}
+
protected String getUserId() {
return userId;
}
+
protected String getFlowName() {
return flowName;
}
+
protected Long getFlowRunId() {
return flowRunId;
}
@@ -444,8 +457,8 @@ class GenericEntityReader extends TimelineEntityReader {
@Override
protected void validateParams() {
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
- Preconditions.checkNotNull(
- getDataToRetrieve(), "data to retrieve shouldn't be null");
+ Preconditions.checkNotNull(getDataToRetrieve(),
+ "data to retrieve shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getAppId(),
@@ -463,11 +476,13 @@ class GenericEntityReader extends TimelineEntityReader {
throws IOException {
TimelineReaderContext context = getContext();
// In reality all three should be null or neither should be null
- if (context.getFlowName() == null || context.getFlowRunId() == null ||
- context.getUserId() == null) {
+ if (context.getFlowName() == null || context.getFlowRunId() == null
+ || context.getUserId() == null) {
// Get flow context information from AppToFlow table.
- FlowContext flowContext = lookupFlowContext(
- context.getClusterId(), context.getAppId(), hbaseConf, conn);
+ AppToFlowRowKey appToFlowRowKey =
+ new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+ FlowContext flowContext =
+ lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
context.setFlowName(flowContext.flowName);
context.setFlowRunId(flowContext.flowRunId);
context.setUserId(flowContext.userId);
@@ -485,9 +500,9 @@ class GenericEntityReader extends TimelineEntityReader {
FilterList filterList) throws IOException {
TimelineReaderContext context = getContext();
byte[] rowKey =
- EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(),
+ new EntityRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
- context.getEntityType(), context.getEntityId());
+ context.getEntityType(), context.getEntityId()).getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -497,15 +512,17 @@ class GenericEntityReader extends TimelineEntityReader {
}
@Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
+ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
TimelineReaderContext context = getContext();
- scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
- context.getClusterId(), context.getUserId(), context.getFlowName(),
- context.getFlowRunId(), context.getAppId(), context.getEntityType()));
+ RowKeyPrefix<EntityRowKey> entityRowKeyPrefix =
+ new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(),
+ context.getFlowName(), context.getFlowRunId(), context.getAppId(),
+ context.getEntityType());
+ scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
scan.setFilter(filterList);
@@ -535,18 +552,16 @@ class GenericEntityReader extends TimelineEntityReader {
// locally as relevant HBase filters to filter out rows on the basis of
// isRelatedTo are not set in HBase scan.
boolean checkIsRelatedTo =
- !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
- filters.getIsRelatedTo().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
- checkIsRelatedTo) {
- TimelineStorageUtils.readRelationship(
- entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
- if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
- filters.getIsRelatedTo())) {
+ !isSingleEntityRead() && filters.getIsRelatedTo() != null
+ && filters.getIsRelatedTo().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+ if (checkIsRelatedTo
+ && !TimelineStorageUtils.matchIsRelatedTo(entity,
+ filters.getIsRelatedTo())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
- Field.IS_RELATED_TO)) {
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
@@ -556,31 +571,29 @@ class GenericEntityReader extends TimelineEntityReader {
// locally as relevant HBase filters to filter out rows on the basis of
// relatesTo are not set in HBase scan.
boolean checkRelatesTo =
- !isSingleEntityRead() && filters.getRelatesTo() != null &&
- filters.getRelatesTo().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
- checkRelatesTo) {
- TimelineStorageUtils.readRelationship(
- entity, result, EntityColumnPrefix.RELATES_TO, false);
- if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
- filters.getRelatesTo())) {
+ !isSingleEntityRead() && filters.getRelatesTo() != null
+ && filters.getRelatesTo().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)
+ || checkRelatesTo) {
+ readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+ if (checkRelatesTo
+ && !TimelineStorageUtils.matchRelatesTo(entity,
+ filters.getRelatesTo())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info if fieldsToRetrieve contains INFO or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
- TimelineStorageUtils.readKeyValuePairs(
- entity, result, EntityColumnPrefix.INFO, false);
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
}
// fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
- TimelineStorageUtils.readKeyValuePairs(
- entity, result, EntityColumnPrefix.CONFIG, true);
+ if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
}
// fetch events and match event filters if they exist. If event filters do
@@ -588,24 +601,48 @@ class GenericEntityReader extends TimelineEntityReader {
// as relevant HBase filters to filter out rows on the basis of events
// are not set in HBase scan.
boolean checkEvents =
- !isSingleEntityRead() && filters.getEventFilters() != null &&
- filters.getEventFilters().getFilterList().size() > 0;
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
- checkEvents) {
- TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT);
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
- filters.getEventFilters())) {
+ !isSingleEntityRead() && filters.getEventFilters() != null
+ && filters.getEventFilters().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+ readEvents(entity, result, EntityColumnPrefix.EVENT);
+ if (checkEvents
+ && !TimelineStorageUtils.matchEventFilters(entity,
+ filters.getEventFilters())) {
return null;
}
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics if fieldsToRetrieve contains METRICS or ALL.
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
+ if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
}
return entity;
}
+
+ /**
+ * Helper method for reading key-value pairs for either info or config.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result result from HBase.
+ * @param prefix column prefix.
+ * @param isConfig if true, means we are reading configs, otherwise info.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
+ // info and configuration are of type Map<String, Object or String>
+ Map<String, Object> columns =
+ prefix.readResults(result, stringKeyConverter);
+ if (isConfig) {
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ entity.addConfig(column.getKey(), column.getValue().toString());
+ }
+ } else {
+ entity.addInfo(columns);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 852834e..7b294a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -30,15 +33,27 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
/**
* The base class for reading and deserializing timeline entities from the
@@ -68,6 +83,12 @@ public abstract class TimelineEntityReader {
private boolean sortedKeys = false;
/**
+ * Used to convert strings key components to and from storage format.
+ */
+ private final KeyConverter<String> stringKeyConverter =
+ new StringKeyConverter();
+
+ /**
* Instantiates a reader for multiple-entity reads.
*
* @param ctxt Reader context which defines the scope in which query has to be
@@ -331,7 +352,7 @@ public abstract class TimelineEntityReader {
ColumnPrefix<?> columnPrefix) throws IOException {
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
columnPrefix.readResultsWithTimestamps(
- result, StringKeyConverter.getInstance());
+ result, stringKeyConverter);
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
metricsResult.entrySet()) {
TimelineMetric metric = new TimelineMetric();
@@ -359,4 +380,117 @@ public abstract class TimelineEntityReader {
protected void setTable(BaseTable<?> baseTable) {
this.table = baseTable;
}
+
+ /**
+ * Check if we have a certain field amongst fields to retrieve. This method
+ * checks against {@link Field#ALL} as well because that would mean field
+ * passed needs to be matched.
+ *
+ * @param fieldsToRetrieve fields to be retrieved.
+ * @param requiredField fields to be checked in fieldsToRetrieve.
+ * @return true if has the required field, false otherwise.
+ */
+ protected boolean hasField(EnumSet<Field> fieldsToRetrieve,
+ Field requiredField) {
+ return fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(requiredField);
+ }
+
+ /**
+ * Create a filter list of qualifier filters based on passed set of columns.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param colPrefix Column Prefix.
+ * @param columns set of column qualifiers.
+ * @return filter list.
+ */
+ protected <T> FilterList createFiltersFromColumnQualifiers(
+ ColumnPrefix<T> colPrefix, Set<String> columns) {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ for (String column : columns) {
+ // For columns which have compound column qualifiers (eg. events), we need
+ // to include the required separator.
+ byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryPrefixComparator(colPrefix
+ .getColumnPrefixBytes(compoundColQual))));
+ }
+ return list;
+ }
+
+ protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
+ String column) {
+ if (colPrefix == ApplicationColumnPrefix.EVENT
+ || colPrefix == EntityColumnPrefix.EVENT) {
+ return new EventColumnName(column, null, null).getColumnQualifier();
+ } else {
+ return stringKeyConverter.encode(column);
+ }
+ }
+
+ /**
+ * Helper method for reading relationship.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result result from HBase.
+ * @param prefix column prefix.
+ * @param isRelatedTo if true, means relationship is to be added to
+ * isRelatedTo, otherwise its added to relatesTo.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ protected <T> void readRelationship(TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException {
+ // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+ Map<String, Object> columns =
+ prefix.readResults(result, stringKeyConverter);
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ for (String id : Separator.VALUES.splitEncoded(column.getValue()
+ .toString())) {
+ if (isRelatedTo) {
+ entity.addIsRelatedToEntity(column.getKey(), id);
+ } else {
+ entity.addRelatesToEntity(column.getKey(), id);
+ }
+ }
+ }
+ }
+
+ /**
+ * Read events from the entity table or the application table. The column name
+ * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+ * if there is no info associated with the event.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result HBase Result.
+ * @param prefix column prefix.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ protected static <T> void readEvents(TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix) throws IOException {
+ Map<String, TimelineEvent> eventsMap = new HashMap<>();
+ Map<EventColumnName, Object> eventsResult =
+ prefix.readResults(result, new EventColumnNameConverter());
+ for (Map.Entry<EventColumnName, Object>
+ eventResult : eventsResult.entrySet()) {
+ EventColumnName eventColumnName = eventResult.getKey();
+ String key = eventColumnName.getId() +
+ Long.toString(eventColumnName.getTimestamp());
+ // Retrieve previously seen event to add to it
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ // First time we're seeing this event, add it to the eventsMap
+ event = new TimelineEvent();
+ event.setId(eventColumnName.getId());
+ event.setTimestamp(eventColumnName.getTimestamp());
+ eventsMap.put(key, event);
+ }
+ if (eventColumnName.getInfoKey() != null) {
+ event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
+ }
+ }
+ Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+ entity.addEvents(eventsSet);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
index 74e4b5d..58df970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
@@ -24,220 +24,13 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter;
import org.junit.Test;
public class TestKeyConverters {
- private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
- private final static byte[] QUALIFIER_SEP_BYTES =
- Bytes.toBytes(QUALIFIER_SEP);
- private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
- private final static String USER = QUALIFIER_SEP + "user";
- private final static String FLOW_NAME =
- "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP;
- private final static Long FLOW_RUN_ID;
- private final static String APPLICATION_ID;
- static {
- long runid = Long.MAX_VALUE - 900L;
- byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
- byte[] byteArr = Bytes.toBytes(runid);
- int sepByteLen = QUALIFIER_SEP_BYTES.length;
- if (sepByteLen <= byteArr.length) {
- for (int i = 0; i < sepByteLen; i++) {
- byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
- }
- }
- FLOW_RUN_ID = Bytes.toLong(byteArr);
- long clusterTs = System.currentTimeMillis();
- byteArr = Bytes.toBytes(clusterTs);
- if (sepByteLen <= byteArr.length) {
- for (int i = 0; i < sepByteLen; i++) {
- byteArr[byteArr.length - sepByteLen + i] =
- (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] -
- QUALIFIER_SEP_BYTES[i]);
- }
- }
- clusterTs = Bytes.toLong(byteArr);
- int seqId = 222;
- APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
- }
-
- private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
- int sepLen = QUALIFIER_SEP_BYTES.length;
- for (int i = 0; i < sepLen; i++) {
- assertTrue("Row key prefix not encoded properly.",
- byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
- QUALIFIER_SEP_BYTES[i]);
- }
- }
-
- @Test
- public void testFlowActivityRowKeyConverter() {
- Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L);
- byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME));
- FlowActivityRowKey rowKey =
- FlowActivityRowKeyConverter.getInstance().decode(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
- assertEquals(ts, rowKey.getDayTimestamp());
- assertEquals(USER, rowKey.getUserId());
- assertEquals(FLOW_NAME, rowKey.getFlowName());
-
- byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(CLUSTER, null, null, null));
- byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
- assertEquals(2, splits.length);
- assertEquals(0, splits[1].length);
- assertEquals(CLUSTER,
- Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
-
- byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
- new FlowActivityRowKey(CLUSTER, ts, null, null));
- splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
- assertEquals(3, splits.length);
- assertEquals(0, splits[2].length);
- assertEquals(CLUSTER,
- Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
- assertEquals(ts, (Long) TimelineStorageUtils.invertLong(
- Bytes.toLong(splits[1])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
- }
-
- @Test
- public void testFlowRunRowKeyConverter() {
- byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode(
- new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID));
- FlowRunRowKey rowKey =
- FlowRunRowKeyConverter.getInstance().decode(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
- assertEquals(USER, rowKey.getUserId());
- assertEquals(FLOW_NAME, rowKey.getFlowName());
- assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-
- byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode(
- new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null));
- byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
- assertEquals(4, splits.length);
- assertEquals(0, splits[3].length);
- assertEquals(FLOW_NAME,
- Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
- }
-
- @Test
- public void testApplicationRowKeyConverter() {
- byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
- APPLICATION_ID));
- ApplicationRowKey rowKey =
- ApplicationRowKeyConverter.getInstance().decode(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
- assertEquals(USER, rowKey.getUserId());
- assertEquals(FLOW_NAME, rowKey.getFlowName());
- assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
- assertEquals(APPLICATION_ID, rowKey.getAppId());
-
- byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null));
- byte[][] splits =
- Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
- Separator.VARIABLE_SIZE });
- assertEquals(5, splits.length);
- assertEquals(0, splits[4].length);
- assertEquals(FLOW_NAME,
- Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
- assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong(
- Bytes.toLong(splits[3])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
-
- byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
- new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null));
- splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
- assertEquals(4, splits.length);
- assertEquals(0, splits[3].length);
- assertEquals(FLOW_NAME,
- Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
- }
-
- @Test
- public void testEntityRowKeyConverter() {
- String entityId = "!ent!ity!!id!";
- String entityType = "entity!Type";
- byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode(
- new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
- entityType, entityId));
- EntityRowKey rowKey =
- EntityRowKeyConverter.getInstance().decode(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
- assertEquals(USER, rowKey.getUserId());
- assertEquals(FLOW_NAME, rowKey.getFlowName());
- assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
- assertEquals(APPLICATION_ID, rowKey.getAppId());
- assertEquals(entityType, rowKey.getEntityType());
- assertEquals(entityId, rowKey.getEntityId());
-
- byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
- new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
- entityType, null));
- byte[][] splits =
- Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
- AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE });
- assertEquals(7, splits.length);
- assertEquals(0, splits[6].length);
- assertEquals(APPLICATION_ID,
- AppIdKeyConverter.getInstance().decode(splits[4]));
- assertEquals(entityType, Separator.QUALIFIERS.decode(
- Bytes.toString(splits[5])));
- verifyRowPrefixBytes(byteRowKeyPrefix);
-
- byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
- new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
- null, null));
- splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
- AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE });
- assertEquals(6, splits.length);
- assertEquals(0, splits[5].length);
- assertEquals(APPLICATION_ID,
- AppIdKeyConverter.getInstance().decode(splits[4]));
- verifyRowPrefixBytes(byteRowKeyPrefix);
- }
-
- @Test
- public void testAppToFlowRowKeyConverter() {
- byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode(
- new AppToFlowRowKey(CLUSTER, APPLICATION_ID));
- AppToFlowRowKey rowKey =
- AppToFlowRowKeyConverter.getInstance().decode(byteRowKey);
- assertEquals(CLUSTER, rowKey.getClusterId());
- assertEquals(APPLICATION_ID, rowKey.getAppId());
- }
@Test
public void testAppIdKeyConverter() {
+ AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
long currentTs = System.currentTimeMillis();
ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
@@ -245,18 +38,19 @@ public class TestKeyConverters {
String appIdStr1 = appId1.toString();
String appIdStr2 = appId2.toString();
String appIdStr3 = appId3.toString();
- byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1);
- byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2);
- byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3);
+ byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
+ byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
+ byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
// App ids' should be encoded in a manner wherein descending order
// is maintained.
- assertTrue("Ordering of app ids' is incorrect",
- Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
- Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
- Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
- String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1);
- String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2);
- String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3);
+ assertTrue(
+ "Ordering of app ids' is incorrect",
+ Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
+ && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
+ && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+ String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
+ String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
+ String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr1.equals(decodedAppId1));
assertTrue("Decoded app id is not same as the app id encoded",
@@ -273,21 +67,64 @@ public class TestKeyConverters {
Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
byte[] ts = Bytes.add(valSepBytes, maxByteArr);
Long eventTs = Bytes.toLong(ts);
- byte[] byteEventColName = EventColumnNameConverter.getInstance().encode(
- new EventColumnName(eventId, eventTs, null));
+ byte[] byteEventColName =
+ new EventColumnName(eventId, eventTs, null).getColumnQualifier();
+ KeyConverter<EventColumnName> eventColumnNameConverter =
+ new EventColumnNameConverter();
EventColumnName eventColName =
- EventColumnNameConverter.getInstance().decode(byteEventColName);
+ eventColumnNameConverter.decode(byteEventColName);
assertEquals(eventId, eventColName.getId());
assertEquals(eventTs, eventColName.getTimestamp());
assertNull(eventColName.getInfoKey());
String infoKey = "f=oo_event_in=fo=_key";
- byteEventColName = EventColumnNameConverter.getInstance().encode(
- new EventColumnName(eventId, eventTs, infoKey));
- eventColName =
- EventColumnNameConverter.getInstance().decode(byteEventColName);
+ byteEventColName =
+ new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
+ eventColName = eventColumnNameConverter.decode(byteEventColName);
assertEquals(eventId, eventColName.getId());
assertEquals(eventTs, eventColName.getTimestamp());
assertEquals(infoKey, eventColName.getInfoKey());
}
+
+ @Test
+ public void testLongKeyConverter() {
+ LongKeyConverter longKeyConverter = new LongKeyConverter();
+ confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
+ confirmLongKeyConverter(longKeyConverter, -1234567890L);
+ confirmLongKeyConverter(longKeyConverter, -128L);
+ confirmLongKeyConverter(longKeyConverter, -127L);
+ confirmLongKeyConverter(longKeyConverter, -1L);
+ confirmLongKeyConverter(longKeyConverter, 0L);
+ confirmLongKeyConverter(longKeyConverter, 1L);
+ confirmLongKeyConverter(longKeyConverter, 127L);
+ confirmLongKeyConverter(longKeyConverter, 128L);
+ confirmLongKeyConverter(longKeyConverter, 1234567890L);
+ confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
+ }
+
+ private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
+ Long testValue) {
+ Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
+ assertEquals(testValue, decoded);
+ }
+
+ @Test
+ public void testStringKeyConverter() {
+ StringKeyConverter stringKeyConverter = new StringKeyConverter();
+ String phrase = "QuackAttack now!";
+
+ for (int i = 0; i < phrase.length(); i++) {
+ String sub = phrase.substring(i, phrase.length());
+ confirmStrignKeyConverter(stringKeyConverter, sub);
+ confirmStrignKeyConverter(stringKeyConverter, sub + sub);
+ }
+ }
+
+ private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
+ String testValue) {
+ String decoded =
+ stringKeyConverter.decode(stringKeyConverter.encode(testValue));
+ assertEquals(testValue, decoded);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org