You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/02/23 19:44:56 UTC
[15/50] [abbrv] hadoop git commit: YARN-7919. Refactor
timelineservice-hbase module into submodules. Contributed by Haibo Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
new file mode 100644
index 0000000..faed348
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Query;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+
+import com.google.common.base.Preconditions;
+
+class SubApplicationEntityReader extends GenericEntityReader {
+ private static final SubApplicationTableRW SUB_APPLICATION_TABLE =
+ new SubApplicationTableRW();
+
+ SubApplicationEntityReader(TimelineReaderContext ctxt,
+ TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
+ super(ctxt, entityFilters, toRetrieve);
+ }
+
+ SubApplicationEntityReader(TimelineReaderContext ctxt,
+ TimelineDataToRetrieve toRetrieve) {
+ super(ctxt, toRetrieve);
+ }
+
+ /**
+ * Uses the {@link SubApplicationTableRW}.
+ */
+ protected BaseTableRW<?> getTable() {
+ return SUB_APPLICATION_TABLE;
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
+ // Filters here cannot be null for multiple entity reads as they are set in
+ // augmentParams if null.
+ FilterList listBasedOnFilters = new FilterList();
+ TimelineEntityFilters filters = getFilters();
+ // Create filter list based on created time range and add it to
+ // listBasedOnFilters.
+ long createdTimeBegin = filters.getCreatedTimeBegin();
+ long createdTimeEnd = filters.getCreatedTimeEnd();
+ if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+ listBasedOnFilters.addFilter(TimelineFilterUtils
+ .createSingleColValueFiltersByRange(SubApplicationColumn.CREATED_TIME,
+ createdTimeBegin, createdTimeEnd));
+ }
+ // Create filter list based on metric filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList metricFilters = filters.getMetricFilters();
+ if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ SubApplicationColumnPrefix.METRIC, metricFilters));
+ }
+ // Create filter list based on config filters and add it to
+ // listBasedOnFilters.
+ TimelineFilterList configFilters = filters.getConfigFilters();
+ if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ SubApplicationColumnPrefix.CONFIG, configFilters));
+ }
+ // Create filter list based on info filters and add it to listBasedOnFilters
+ TimelineFilterList infoFilters = filters.getInfoFilters();
+ if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+ listBasedOnFilters.addFilter(TimelineFilterUtils
+ .createHBaseFilterList(SubApplicationColumnPrefix.INFO, infoFilters));
+ }
+ return listBasedOnFilters;
+ }
+
+ /**
+ * Add {@link QualifierFilter} filters to filter list for each column of
+ * entity table.
+ *
+ * @param list filter list to which qualifier filters have to be added.
+ */
+ protected void updateFixedColumns(FilterList list) {
+ for (SubApplicationColumn column : SubApplicationColumn.values()) {
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryComparator(column.getColumnQualifierBytes())));
+ }
+ }
+
+ /**
+ * Creates a filter list which indicates that only some of the column
+ * qualifiers in the info column family will be returned in result.
+ *
+ * @param isApplication If true, it means operations are to be performed for
+ * application table, otherwise for entity table.
+ * @return filter list.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ private FilterList createFilterListForColsOfInfoFamily() throws IOException {
+ FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+ // Add filters for each column in entity table.
+ updateFixedColumns(infoFamilyColsFilter);
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+ // If INFO field has to be retrieved, add a filter for fetching columns
+ // with INFO column prefix.
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
+ SubApplicationColumnPrefix.INFO));
+ }
+ TimelineFilterList relatesTo = getFilters().getRelatesTo();
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ // If RELATES_TO field has to be retrieved, add a filter for fetching
+ // columns with RELATES_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
+ SubApplicationColumnPrefix.RELATES_TO));
+ } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain RELATES_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // relatesTo filters are specified. relatesTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> relatesToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ SubApplicationColumnPrefix.RELATES_TO, relatesToCols));
+ }
+ TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+ // columns with IS_RELATED_TO column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
+ SubApplicationColumnPrefix.IS_RELATED_TO));
+ } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+ // need to have a filter to fetch some of the column qualifiers if
+ // isRelatedTo filters are specified. isRelatedTo filters will then be
+ // matched after fetching rows from HBase.
+ Set<String> isRelatedToCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ SubApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+ }
+ TimelineFilterList eventFilters = getFilters().getEventFilters();
+ if (hasField(fieldsToRetrieve, Field.EVENTS)) {
+ // If EVENTS field has to be retrieved, add a filter for fetching columns
+ // with EVENT column prefix.
+ infoFamilyColsFilter.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
+ SubApplicationColumnPrefix.EVENT));
+ } else if (eventFilters != null
+ && !eventFilters.getFilterList().isEmpty()) {
+ // Even if fields to retrieve does not contain EVENTS, we still need to
+ // have a filter to fetch some of the column qualifiers on the basis of
+ // event filters specified. Event filters will then be matched after
+ // fetching rows from HBase.
+ Set<String> eventCols =
+ TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+ SubApplicationColumnPrefix.EVENT, eventCols));
+ }
+ return infoFamilyColsFilter;
+ }
+
+ /**
+ * Exclude column prefixes via filters which are not required(based on fields
+ * to retrieve) from info column family. These filters are added to filter
+ * list which contains a filter for getting info column family.
+ *
+ * @param infoColFamilyList filter list for info column family.
+ */
+ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+ // Events not required.
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+ infoColFamilyList.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ SubApplicationColumnPrefix.EVENT));
+ }
+ // info not required.
+ if (!hasField(fieldsToRetrieve, Field.INFO)) {
+ infoColFamilyList.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ SubApplicationColumnPrefix.INFO));
+ }
+ // is related to not required.
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ infoColFamilyList.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ SubApplicationColumnPrefix.IS_RELATED_TO));
+ }
+ // relates to not required.
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ infoColFamilyList.addFilter(
+ TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+ SubApplicationColumnPrefix.RELATES_TO));
+ }
+ }
+
+ /**
+ * Updates filter list based on fields for confs and metrics to retrieve.
+ *
+ * @param listBasedOnFields filter list based on fields.
+ * @throws IOException if any problem occurs while updating filter list.
+ */
+ private void updateFilterForConfsAndMetricsToRetrieve(
+ FilterList listBasedOnFields) throws IOException {
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+ // Please note that if confsToRetrieve is specified, we would have added
+ // CONFS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+ // Create a filter list for configs.
+ listBasedOnFields.addFilter(
+ TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getConfsToRetrieve(),
+ SubApplicationColumnFamily.CONFIGS,
+ SubApplicationColumnPrefix.CONFIG));
+ }
+
+ // Please note that if metricsToRetrieve is specified, we would have added
+ // METRICS to fields to retrieve in augmentParams() even if not specified.
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+ // Create a filter list for metrics.
+ listBasedOnFields.addFilter(
+ TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve(
+ dataToRetrieve.getMetricsToRetrieve(),
+ SubApplicationColumnFamily.METRICS,
+ SubApplicationColumnPrefix.METRIC));
+ }
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFields() throws IOException {
+ if (!needCreateFilterListBasedOnFields()) {
+ // Fetch all the columns. No need of a filter.
+ return null;
+ }
+ FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily = new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(SubApplicationColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ if (fetchPartialColsFromInfoFamily()) {
+ // We can fetch only some of the columns from info family.
+ infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+ } else {
+ // Exclude column prefixes in info column family which are not required
+ // based on fields to retrieve.
+ excludeFieldsFromInfoColFamily(infoColFamilyList);
+ }
+ listBasedOnFields.addFilter(infoColFamilyList);
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ return listBasedOnFields;
+ }
+
+ @Override
+ protected void validateParams() {
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+ Preconditions.checkNotNull(getDataToRetrieve(),
+ "data to retrieve shouldn't be null");
+ Preconditions.checkNotNull(getContext().getClusterId(),
+ "clusterId shouldn't be null");
+ Preconditions.checkNotNull(getContext().getDoAsUser(),
+ "DoAsUser shouldn't be null");
+ Preconditions.checkNotNull(getContext().getEntityType(),
+ "entityType shouldn't be null");
+ }
+
+ @Override
+ protected void augmentParams(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+ createFiltersIfNull();
+ }
+
+ private void setMetricsTimeRange(Query query) {
+ // Set time range for metric values.
+ HBaseTimelineStorageUtils.setMetricsTimeRange(query,
+ SubApplicationColumnFamily.METRICS.getBytes(),
+ getDataToRetrieve().getMetricsTimeBegin(),
+ getDataToRetrieve().getMetricsTimeEnd());
+ }
+
+ @Override
+ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
+
+ // Scan through part of the table to find the entities belong to one app
+ // and one type
+ Scan scan = new Scan();
+ TimelineReaderContext context = getContext();
+ if (context.getDoAsUser() == null) {
+ throw new BadRequestException("Invalid user!");
+ }
+
+ RowKeyPrefix<SubApplicationRowKey> subApplicationRowKeyPrefix = null;
+ // default mode, will always scans from beginning of entity type.
+ if (getFilters() == null || getFilters().getFromId() == null) {
+ subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix(
+ context.getDoAsUser(), context.getClusterId(),
+ context.getEntityType(), null, null, null);
+ scan.setRowPrefixFilter(subApplicationRowKeyPrefix.getRowKeyPrefix());
+ } else { // pagination mode, will scan from given entityIdPrefix!enitityId
+
+ SubApplicationRowKey entityRowKey = null;
+ try {
+ entityRowKey = SubApplicationRowKey
+ .parseRowKeyFromString(getFilters().getFromId());
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException("Invalid filter fromid is provided.");
+ }
+ if (!context.getClusterId().equals(entityRowKey.getClusterId())) {
+ throw new BadRequestException(
+ "fromid doesn't belong to clusterId=" + context.getClusterId());
+ }
+
+ // set start row
+ scan.setStartRow(entityRowKey.getRowKey());
+
+ // get the bytes for stop row
+ subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix(
+ context.getDoAsUser(), context.getClusterId(),
+ context.getEntityType(), null, null, null);
+
+ // set stop row
+ scan.setStopRow(
+ HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+ subApplicationRowKeyPrefix.getRowKeyPrefix()));
+
+ // set page filter to limit. This filter has to set only in pagination
+ // mode.
+ filterList.addFilter(new PageFilter(getFilters().getLimit()));
+ }
+ setMetricsTimeRange(scan);
+ scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ scan.setFilter(filterList);
+ }
+ return getTable().getResultScanner(hbaseConf, conn, scan);
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
+ throw new UnsupportedOperationException(
+ "we don't support a single entity query");
+ }
+
+ @Override
+ protected TimelineEntity parseEntity(Result result) throws IOException {
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ TimelineEntity entity = new TimelineEntity();
+ SubApplicationRowKey parseRowKey =
+ SubApplicationRowKey.parseRowKey(result.getRow());
+ entity.setType(parseRowKey.getEntityType());
+ entity.setId(parseRowKey.getEntityId());
+ entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue());
+
+ TimelineEntityFilters filters = getFilters();
+ // fetch created time
+ Long createdTime = (Long) ColumnRWHelper.readResult(result,
+ SubApplicationColumn.CREATED_TIME);
+ entity.setCreatedTime(createdTime);
+
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+ // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // isRelatedTo are not set in HBase scan.
+ boolean checkIsRelatedTo =
+ filters.getIsRelatedTo() != null
+ && filters.getIsRelatedTo().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, SubApplicationColumnPrefix.IS_RELATED_TO,
+ true);
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+ filters.getIsRelatedTo())) {
+ return null;
+ }
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+ entity.getIsRelatedToEntities().clear();
+ }
+ }
+
+ // fetch relates to entities and match relatesTo filter. If relatesTo
+ // filters do not match, entity would be dropped. We have to match filters
+ // locally as relevant HBase filters to filter out rows on the basis of
+ // relatesTo are not set in HBase scan.
+ boolean checkRelatesTo =
+ !isSingleEntityRead() && filters.getRelatesTo() != null
+ && filters.getRelatesTo().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO) || checkRelatesTo) {
+ readRelationship(entity, result, SubApplicationColumnPrefix.RELATES_TO,
+ false);
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+ filters.getRelatesTo())) {
+ return null;
+ }
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+ entity.getRelatesToEntities().clear();
+ }
+ }
+
+ // fetch info if fieldsToRetrieve contains INFO or ALL.
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
+ readKeyValuePairs(entity, result, SubApplicationColumnPrefix.INFO, false);
+ }
+
+ // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+ if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+ readKeyValuePairs(entity, result, SubApplicationColumnPrefix.CONFIG,
+ true);
+ }
+
+ // fetch events and match event filters if they exist. If event filters do
+ // not match, entity would be dropped. We have to match filters locally
+ // as relevant HBase filters to filter out rows on the basis of events
+ // are not set in HBase scan.
+ boolean checkEvents =
+ !isSingleEntityRead() && filters.getEventFilters() != null
+ && filters.getEventFilters().getFilterList().size() > 0;
+ if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+ readEvents(entity, result, SubApplicationColumnPrefix.EVENT);
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+ filters.getEventFilters())) {
+ return null;
+ }
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+ entity.getEvents().clear();
+ }
+ }
+
+ // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+ if (hasField(fieldsToRetrieve, Field.METRICS)) {
+ readMetrics(entity, result, SubApplicationColumnPrefix.METRIC);
+ }
+
+ entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
+ parseRowKey.getRowKeyAsString());
+ return entity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
new file mode 100644
index 0000000..3168163
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+public abstract class TimelineEntityReader extends
+ AbstractTimelineStorageReader {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TimelineEntityReader.class);
+
+ private final boolean singleEntityRead;
+ private TimelineDataToRetrieve dataToRetrieve;
+ // used only for multiple entity read mode
+ private TimelineEntityFilters filters;
+
+ /**
+ * Main table the entity reader uses.
+ */
+ private BaseTableRW<?> table;
+
+ /**
+ * Used to convert strings key components to and from storage format.
+ */
+ private final KeyConverter<String> stringKeyConverter =
+ new StringKeyConverter();
+
+ /**
+ * Instantiates a reader for multiple-entity reads.
+ *
+ * @param ctxt Reader context which defines the scope in which query has to be
+ * made.
+ * @param entityFilters Filters which limit the entities returned.
+ * @param toRetrieve Data to retrieve for each entity.
+ */
+ protected TimelineEntityReader(TimelineReaderContext ctxt,
+ TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
+ super(ctxt);
+ this.singleEntityRead = false;
+ this.dataToRetrieve = toRetrieve;
+ this.filters = entityFilters;
+
+ this.setTable(getTable());
+ }
+
+ /**
+ * Instantiates a reader for single-entity reads.
+ *
+ * @param ctxt Reader context which defines the scope in which query has to be
+ * made.
+ * @param toRetrieve Data to retrieve for each entity.
+ */
+ protected TimelineEntityReader(TimelineReaderContext ctxt,
+ TimelineDataToRetrieve toRetrieve) {
+ super(ctxt);
+ this.singleEntityRead = true;
+ this.dataToRetrieve = toRetrieve;
+
+ this.setTable(getTable());
+ }
+
+ /**
+ * Creates a {@link FilterList} based on fields, confs and metrics to
+ * retrieve. This filter list will be set in Scan/Get objects to trim down
+ * results fetched from HBase back-end storage. This is called only for
+ * multiple entity reads.
+ *
+ * @return a {@link FilterList} object.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ protected abstract FilterList constructFilterListBasedOnFields()
+ throws IOException;
+
+ /**
+ * Creates a {@link FilterList} based on info, config and metric filters. This
+ * filter list will be set in HBase Get to trim down results fetched from
+ * HBase back-end storage.
+ *
+ * @return a {@link FilterList} object.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ protected abstract FilterList constructFilterListBasedOnFilters()
+ throws IOException;
+
+ /**
+ * Combines filter lists created based on fields and based on filters.
+ *
+ * @return a {@link FilterList} object if it can be constructed. Returns null,
+ * if filter list cannot be created either on the basis of filters or on the
+ * basis of fields.
+ * @throws IOException if any problem occurs while creating filter list.
+ */
+ private FilterList createFilterList() throws IOException {
+ FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
+ boolean hasListBasedOnFilters = listBasedOnFilters != null &&
+ !listBasedOnFilters.getFilters().isEmpty();
+ FilterList listBasedOnFields = constructFilterListBasedOnFields();
+ boolean hasListBasedOnFields = listBasedOnFields != null &&
+ !listBasedOnFields.getFilters().isEmpty();
+ // If filter lists based on both filters and fields can be created,
+ // combine them in a new filter list and return it.
+ // If either one of them has been created, return that filter list.
+ // Return null, if none of the filter lists can be created. This indicates
+ // that no filter list needs to be added to HBase Scan as filters are not
+ // specified for the query or only the default view of entity needs to be
+ // returned.
+ if (hasListBasedOnFilters && hasListBasedOnFields) {
+ FilterList list = new FilterList();
+ list.addFilter(listBasedOnFilters);
+ list.addFilter(listBasedOnFields);
+ return list;
+ } else if (hasListBasedOnFilters) {
+ return listBasedOnFilters;
+ } else if (hasListBasedOnFields) {
+ return listBasedOnFields;
+ }
+ return null;
+ }
+
+ protected TimelineDataToRetrieve getDataToRetrieve() {
+ return dataToRetrieve;
+ }
+
+ protected TimelineEntityFilters getFilters() {
+ return filters;
+ }
+
+ /**
+ * Create a {@link TimelineEntityFilters} object with default values for
+ * filters.
+ */
+ protected void createFiltersIfNull() {
+ if (filters == null) {
+ filters = new TimelineEntityFilters.Builder().build();
+ }
+ }
+
+ /**
+ * Reads and deserializes a single timeline entity from the HBase storage.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @return A <cite>TimelineEntity</cite> object.
+ * @throws IOException if there is any exception encountered while reading
+ * entity.
+ */
+ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ validateParams();
+ augmentParams(hbaseConf, conn);
+
+ FilterList filterList = constructFilterListBasedOnFields();
+ if (LOG.isDebugEnabled() && filterList != null) {
+ LOG.debug("FilterList created for get is - " + filterList);
+ }
+ Result result = getResult(hbaseConf, conn, filterList);
+ if (result == null || result.isEmpty()) {
+ // Could not find a matching row.
+ LOG.info("Cannot find matching entity of type " +
+ getContext().getEntityType());
+ return null;
+ }
+ return parseEntity(result);
+ }
+
+ /**
+ * Reads and deserializes a set of timeline entities from the HBase storage.
+ * It goes through all the results available, and returns the number of
+ * entries as specified in the limit in the entity's natural sort order.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @return a set of <cite>TimelineEntity</cite> objects.
+ * @throws IOException if any exception is encountered while reading entities.
+ */
+ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ validateParams();
+ augmentParams(hbaseConf, conn);
+
+ Set<TimelineEntity> entities = new LinkedHashSet<>();
+ FilterList filterList = createFilterList();
+ if (LOG.isDebugEnabled() && filterList != null) {
+ LOG.debug("FilterList created for scan is - " + filterList);
+ }
+ ResultScanner results = getResults(hbaseConf, conn, filterList);
+ try {
+ for (Result result : results) {
+ TimelineEntity entity = parseEntity(result);
+ if (entity == null) {
+ continue;
+ }
+ entities.add(entity);
+ if (entities.size() == filters.getLimit()) {
+ break;
+ }
+ }
+ return entities;
+ } finally {
+ results.close();
+ }
+ }
+
+ /**
+ * Returns the main table to be used by the entity reader.
+ *
+ * @return A reference to the table.
+ */
+ protected BaseTableRW<?> getTable() {
+ return table;
+ }
+
+ /**
+ * Fetches a {@link Result} instance for a single-entity read.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @param filterList filter list which will be applied to HBase Get.
+ * @return the {@link Result} instance or null if no such record is found.
+ * @throws IOException if any exception is encountered while getting result.
+ */
+ protected abstract Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException;
+
+ /**
+ * Fetches a {@link ResultScanner} for a multi-entity read.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @param filterList filter list which will be applied to HBase Scan.
+ * @return the {@link ResultScanner} instance.
+ * @throws IOException if any exception is encountered while getting results.
+ */
+ protected abstract ResultScanner getResults(Configuration hbaseConf,
+ Connection conn, FilterList filterList) throws IOException;
+
+ /**
+ * Parses the result retrieved from HBase backend and convert it into a
+ * {@link TimelineEntity} object.
+ *
+ * @param result Single row result of a Get/Scan.
+ * @return the <cite>TimelineEntity</cite> instance or null if the entity is
+ * filtered.
+ * @throws IOException if any exception is encountered while parsing entity.
+ */
+ protected abstract TimelineEntity parseEntity(Result result)
+ throws IOException;
+
+ /**
+ * Helper method for reading and deserializing {@link TimelineMetric} objects
+ * using the specified column prefix. The timeline metrics then are added to
+ * the given timeline entity.
+ *
+ * @param entity {@link TimelineEntity} object.
+ * @param result {@link Result} object retrieved from backend.
+ * @param columnPrefix Metric column prefix
+ * @throws IOException if any exception is encountered while reading metrics.
+ */
+ protected void readMetrics(TimelineEntity entity, Result result,
+ ColumnPrefix<?> columnPrefix) throws IOException {
+ NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+ ColumnRWHelper.readResultsWithTimestamps(
+ result, columnPrefix, stringKeyConverter);
+ for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+ metricsResult.entrySet()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId(metricResult.getKey());
+ // Simply assume that if the value set contains more than 1 elements, the
+ // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+ TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ?
+ TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE;
+ metric.setType(metricType);
+ metric.addValues(metricResult.getValue());
+ entity.addMetric(metric);
+ }
+ }
+
+ /**
+ * Checks whether the reader has been created to fetch single entity or
+ * multiple entities.
+ *
+ * @return true, if query is for single entity, false otherwise.
+ */
+ public boolean isSingleEntityRead() {
+ return singleEntityRead;
+ }
+
+ protected void setTable(BaseTableRW<?> baseTable) {
+ this.table = baseTable;
+ }
+
+ /**
+ * Check if we have a certain field amongst fields to retrieve. This method
+ * checks against {@link Field#ALL} as well because that would mean field
+ * passed needs to be matched.
+ *
+ * @param fieldsToRetrieve fields to be retrieved.
+ * @param requiredField fields to be checked in fieldsToRetrieve.
+ * @return true if has the required field, false otherwise.
+ */
+ protected boolean hasField(EnumSet<Field> fieldsToRetrieve,
+ Field requiredField) {
+ return fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(requiredField);
+ }
+
+ /**
+ * Create a filter list of qualifier filters based on passed set of columns.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param colPrefix Column Prefix.
+ * @param columns set of column qualifiers.
+ * @return filter list.
+ */
+ protected <T extends BaseTable<T>> FilterList
+ createFiltersFromColumnQualifiers(
+ ColumnPrefix<T> colPrefix, Set<String> columns) {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ for (String column : columns) {
+ // For columns which have compound column qualifiers (eg. events), we need
+ // to include the required separator.
+ byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+ new BinaryPrefixComparator(colPrefix
+ .getColumnPrefixBytes(compoundColQual))));
+ }
+ return list;
+ }
+
+ protected <T extends BaseTable<T>> byte[] createColQualifierPrefix(
+ ColumnPrefix<T> colPrefix, String column) {
+ if (colPrefix == ApplicationColumnPrefix.EVENT
+ || colPrefix == EntityColumnPrefix.EVENT) {
+ return new EventColumnName(column, null, null).getColumnQualifier();
+ } else {
+ return stringKeyConverter.encode(column);
+ }
+ }
+
+ /**
+ * Helper method for reading relationship.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result result from HBase.
+ * @param prefix column prefix.
+ * @param isRelatedTo if true, means relationship is to be added to
+ * isRelatedTo, otherwise its added to relatesTo.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ protected <T extends BaseTable<T>> void readRelationship(
+ TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException {
+ // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+ Map<String, Object> columns = ColumnRWHelper.readResults(
+ result, prefix, stringKeyConverter);
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ for (String id : Separator.VALUES.splitEncoded(column.getValue()
+ .toString())) {
+ if (isRelatedTo) {
+ entity.addIsRelatedToEntity(column.getKey(), id);
+ } else {
+ entity.addRelatesToEntity(column.getKey(), id);
+ }
+ }
+ }
+ }
+
+ /**
+ * Read events from the entity table or the application table. The column name
+ * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+ * if there is no info associated with the event.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param entity entity to fill.
+ * @param result HBase Result.
+ * @param prefix column prefix.
+ * @throws IOException if any problem is encountered while reading result.
+ */
+ protected static <T extends BaseTable<T>> void readEvents(
+ TimelineEntity entity, Result result,
+ ColumnPrefix<T> prefix) throws IOException {
+ Map<String, TimelineEvent> eventsMap = new HashMap<>();
+ Map<EventColumnName, Object> eventsResult = ColumnRWHelper.readResults(
+ result, prefix, new EventColumnNameConverter());
+ for (Map.Entry<EventColumnName, Object>
+ eventResult : eventsResult.entrySet()) {
+ EventColumnName eventColumnName = eventResult.getKey();
+ String key = eventColumnName.getId() +
+ Long.toString(eventColumnName.getTimestamp());
+ // Retrieve previously seen event to add to it
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ // First time we're seeing this event, add it to the eventsMap
+ event = new TimelineEvent();
+ event.setId(eventColumnName.getId());
+ event.setTimestamp(eventColumnName.getTimestamp());
+ eventsMap.put(key, event);
+ }
+ if (eventColumnName.getInfoKey() != null) {
+ event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
+ }
+ }
+ Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+ entity.addEvents(eventsSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..fa16077
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+public final class TimelineEntityReaderFactory {
+ private TimelineEntityReaderFactory() {
+ }
+
+ /**
+ * Creates a timeline entity reader instance for reading a single entity with
+ * the specified input.
+ *
+ * @param context Reader context which defines the scope in which query has to
+ * be made.
+ * @param dataToRetrieve Data to retrieve for each entity.
+ * @return An implementation of <cite>TimelineEntityReader</cite> object
+ * depending on entity type.
+ */
+ public static TimelineEntityReader createSingleEntityReader(
+ TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) {
+ // currently the types that are handled separate from the generic entity
+ // table are application, flow run, and flow activity entities
+ if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
+ return new ApplicationEntityReader(context, dataToRetrieve);
+ } else if (TimelineEntityType.
+ YARN_FLOW_RUN.matches(context.getEntityType())) {
+ return new FlowRunEntityReader(context, dataToRetrieve);
+ } else if (TimelineEntityType.
+ YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
+ return new FlowActivityEntityReader(context, dataToRetrieve);
+ } else {
+ // assume we're dealing with a generic entity read
+ return new GenericEntityReader(context, dataToRetrieve);
+ }
+ }
+
+ /**
+ * Creates a timeline entity reader instance for reading set of entities with
+ * the specified input and predicates.
+ *
+ * @param context Reader context which defines the scope in which query has to
+ * be made.
+ * @param filters Filters which limit the entities returned.
+ * @param dataToRetrieve Data to retrieve for each entity.
+ * @return An implementation of <cite>TimelineEntityReader</cite> object
+ * depending on entity type.
+ */
+ public static TimelineEntityReader createMultipleEntitiesReader(
+ TimelineReaderContext context, TimelineEntityFilters filters,
+ TimelineDataToRetrieve dataToRetrieve) {
+ // currently the types that are handled separate from the generic entity
+ // table are application, flow run, and flow activity entities
+ if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
+ return new ApplicationEntityReader(context, filters, dataToRetrieve);
+ } else if (TimelineEntityType.
+ YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
+ return new FlowActivityEntityReader(context, filters, dataToRetrieve);
+ } else if (TimelineEntityType.
+ YARN_FLOW_RUN.matches(context.getEntityType())) {
+ return new FlowRunEntityReader(context, filters, dataToRetrieve);
+ } else {
+ if (context.getDoAsUser() != null) {
+ return new SubApplicationEntityReader(context, filters, dataToRetrieve);
+ }
+ // assume we're dealing with a generic entity read
+ return new GenericEntityReader(context, filters, dataToRetrieve);
+ }
+ }
+
+ /**
+ * Creates a timeline entity type reader that will read all available entity
+ * types within the specified context.
+ *
+ * @param context Reader context which defines the scope in which query has to
+ * be made. Limited to application level only.
+ * @return an <cite>EntityTypeReader</cite> object
+ */
+ public static EntityTypeReader createEntityTypeReader(
+ TimelineReaderContext context) {
+ return new EntityTypeReader(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
new file mode 100644
index 0000000..9814d6d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader
+ * contains classes used to read entities from backend based on query type.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java
new file mode 100644
index 0000000..256b24b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the SubApplication table.
+ */
+public class SubApplicationTableRW extends BaseTableRW<SubApplicationTable> {
+ /** sub app prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "subapplication";
+
+ /** config param name that specifies the subapplication table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /**
+ * config param name that specifies the TTL for metrics column family in
+ * subapplication table.
+ */
+ private static final String METRICS_TTL_CONF_NAME = PREFIX
+ + ".table.metrics.ttl";
+
+ /**
+ * config param name that specifies max-versions for
+ * metrics column family in subapplication table.
+ */
+ private static final String METRICS_MAX_VERSIONS =
+ PREFIX + ".table.metrics.max-versions";
+
+ /** default value for subapplication table name. */
+ public static final String DEFAULT_TABLE_NAME =
+ "timelineservice.subapplication";
+
+ /** default TTL is 30 days for metrics timeseries. */
+ private static final int DEFAULT_METRICS_TTL = 2592000;
+
+ /** default max number of versions. */
+ private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SubApplicationTableRW.class);
+
+ public SubApplicationTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor subAppTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(SubApplicationColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ subAppTableDescp.addFamily(infoCF);
+
+ HColumnDescriptor configCF =
+ new HColumnDescriptor(SubApplicationColumnFamily.CONFIGS.getBytes());
+ configCF.setBloomFilterType(BloomType.ROWCOL);
+ configCF.setBlockCacheEnabled(true);
+ subAppTableDescp.addFamily(configCF);
+
+ HColumnDescriptor metricsCF =
+ new HColumnDescriptor(SubApplicationColumnFamily.METRICS.getBytes());
+ subAppTableDescp.addFamily(metricsCF);
+ metricsCF.setBlockCacheEnabled(true);
+ // always keep 1 version (the latest)
+ metricsCF.setMinVersions(1);
+ metricsCF.setMaxVersions(
+ hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
+ metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+ DEFAULT_METRICS_TTL));
+ subAppTableDescp.setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ subAppTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(subAppTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+
+ /**
+ * @param metricsTTL time to live parameter for the metricss in this table.
+ * @param hbaseConf configururation in which to set the metrics TTL config
+ * variable.
+ */
+ public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+ hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java
new file mode 100644
index 0000000..52cc399
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication
+ * contains classes related to implementation for subapplication table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..402a89b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for HBaseTimelineStorageUtils static methos.
+ */
+public class TestHBaseTimelineStorageUtils {
+
+ @Test(expected=NullPointerException.class)
+ public void testGetTimelineServiceHBaseConfNullArgument() throws Exception {
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml
new file mode 100644
index 0000000..cb0d6e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>3.2.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
+ <name>Apache Hadoop YARN TimelineService HBase Common</name>
+ <version>3.2.0-SNAPSHOT</version>
+
+ <properties>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
+ <yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ </dependency>
+
+ <!-- This is needed for GenericObjectMapper in GenericConverter -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- 'mvn dependency:analyze' fails to detect use of this direct
+ dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeScope>runtime</includeScope>
+ <excludeGroupIds>org.slf4j,org.apache.hadoop,com.github.stephenc.findbugs</excludeGroupIds>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
new file mode 100644
index 0000000..c3d6a52
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies fully qualified columns for the {@link ApplicationTable}.
+ */
+public enum ApplicationColumn implements Column<ApplicationTable> {
+
+ /**
+ * App id.
+ */
+ ID(ApplicationColumnFamily.INFO, "id"),
+
+ /**
+ * When the application was created.
+ */
+ CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
+ new LongConverter()),
+
+ /**
+ * The version of the flow that this app belongs to.
+ */
+ FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
+
+ private final ColumnFamily<ApplicationTable> columnFamily;
+ private final String columnQualifier;
+ private final byte[] columnQualifierBytes;
+ private final ValueConverter valueConverter;
+
+ private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+ String columnQualifier) {
+ this(columnFamily, columnQualifier, GenericConverter.getInstance());
+ }
+
+ private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+ String columnQualifier, ValueConverter converter) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnQualifierBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+ this.valueConverter = converter;
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ @Override
+ public byte[] getColumnQualifierBytes() {
+ return columnQualifierBytes.clone();
+ }
+
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public ValueConverter getValueConverter() {
+ return valueConverter;
+ }
+
+ @Override
+ public Attribute[] getCombinedAttrsWithAggr(Attribute... attributes) {
+ return attributes;
+ }
+
+ @Override
+ public boolean supplementCellTimestamp() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
new file mode 100644
index 0000000..97e5f7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the application table column families.
+ */
+public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i"),
+
+ /**
+ * Configurations are in a separate column family for two reasons: a) the size
+ * of the config values can be very large and b) we expect that config values
+ * are often separately accessed from other metrics and info columns.
+ */
+ CONFIGS("c"),
+
+ /**
+ * Metrics have a separate column family, because they have a separate TTL.
+ */
+ METRICS("m");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private ApplicationColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
new file mode 100644
index 0000000..89412f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies partially qualified columns for the application table.
+ */
+public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+
+ /**
+ * To store TimelineEntity getIsRelatedToEntities values.
+ */
+ IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"),
+
+ /**
+ * To store TimelineEntity getRelatesToEntities values.
+ */
+ RELATES_TO(ApplicationColumnFamily.INFO, "r"),
+
+ /**
+ * To store TimelineEntity info values.
+ */
+ INFO(ApplicationColumnFamily.INFO, "i"),
+
+ /**
+ * Lifecycle events for an application.
+ */
+ EVENT(ApplicationColumnFamily.INFO, "e"),
+
+ /**
+ * Config column stores configuration with config key as the column name.
+ */
+ CONFIG(ApplicationColumnFamily.CONFIGS, null),
+
+ /**
+ * Metrics are stored with the metric name as the column name.
+ */
+ METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter());
+
+ private final ColumnFamily<ApplicationTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+ private final ValueConverter valueConverter;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily that this column is stored in.
+ * @param columnPrefix for this column.
+ */
+ private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
+ String columnPrefix) {
+ this(columnFamily, columnPrefix, GenericConverter.getInstance());
+ }
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily that this column is stored in.
+ * @param columnPrefix for this column.
+ * @param converter used to encode/decode values to be stored in HBase for
+ * this column prefix.
+ */
+ private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
+ String columnPrefix, ValueConverter converter) {
+ this.valueConverter = converter;
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+ }
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnFamilyBytes() {
+ return columnFamily.getBytes();
+ }
+
+ @Override
+ public byte[] getColumnPrefixInBytes() {
+ return columnPrefixBytes != null ? columnPrefixBytes.clone() : null;
+ }
+
+ @Override
+ public Attribute[] getCombinedAttrsWithAggr(Attribute... attributes) {
+ return attributes;
+ }
+
+ @Override
+ public boolean supplementCellTimeStamp() {
+ return false;
+ }
+
+ public ValueConverter getValueConverter() {
+ return valueConverter;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org