You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2018/02/18 08:09:49 UTC
[17/18] hadoop git commit: YARN-7919. Refactor timelineservice-hbase
module into submodules. Contributed by Haibo Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..2b98eec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TimelineFilterUtils.class);
+
+ private TimelineFilterUtils() {
+ }
+
+ /**
+ * Returns the equivalent HBase filter list's {@link Operator}.
+ *
+ * @param op timeline filter list operator.
+ * @return HBase filter list's Operator.
+ */
+ private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+ switch (op) {
+ case AND:
+ return Operator.MUST_PASS_ALL;
+ case OR:
+ return Operator.MUST_PASS_ONE;
+ default:
+ throw new IllegalArgumentException("Invalid operator");
+ }
+ }
+
+ /**
+ * Returns the equivalent HBase compare filter's {@link CompareOp}.
+ *
+ * @param op timeline compare op.
+ * @return HBase compare filter's CompareOp.
+ */
+ private static CompareOp getHBaseCompareOp(
+ TimelineCompareOp op) {
+ switch (op) {
+ case LESS_THAN:
+ return CompareOp.LESS;
+ case LESS_OR_EQUAL:
+ return CompareOp.LESS_OR_EQUAL;
+ case EQUAL:
+ return CompareOp.EQUAL;
+ case NOT_EQUAL:
+ return CompareOp.NOT_EQUAL;
+ case GREATER_OR_EQUAL:
+ return CompareOp.GREATER_OR_EQUAL;
+ case GREATER_THAN:
+ return CompareOp.GREATER;
+ default:
+ throw new IllegalArgumentException("Invalid compare operator");
+ }
+ }
+
+ /**
+ * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+ * {@link QualifierFilter}.
+ * @param colPrefix
+ * @param filter
+ * @return a {@link QualifierFilter} object
+ */
+ private static <T extends BaseTable<T>> Filter createHBaseColQualPrefixFilter(
+ ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+ return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+ new BinaryPrefixComparator(
+ colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+ }
+
+ /**
+ * Create a HBase {@link QualifierFilter} for the passed column prefix and
+ * compare op.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param compareOp compare op.
+ * @param columnPrefix column prefix.
+ * @return a column qualifier filter.
+ */
+ public static <T extends BaseTable<T>> Filter createHBaseQualifierFilter(
+ CompareOp compareOp, ColumnPrefix<T> columnPrefix) {
+ return new QualifierFilter(compareOp,
+ new BinaryPrefixComparator(
+ columnPrefix.getColumnPrefixBytes("")));
+ }
+
+ /**
+ * Create filters for confs or metrics to retrieve. This list includes a
+ * configs/metrics family filter and relevant filters for confs/metrics to
+ * retrieve, if present.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param confsOrMetricToRetrieve configs/metrics to retrieve.
+ * @param columnFamily config or metric column family.
+ * @param columnPrefix config or metric column prefix.
+ * @return a filter list.
+ * @throws IOException if any problem occurs while creating the filters.
+ */
+ public static <T extends BaseTable<T>> Filter
+ createFilterForConfsOrMetricsToRetrieve(
+ TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily,
+ ColumnPrefix<T> columnPrefix) throws IOException {
+ Filter familyFilter = new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(columnFamily.getBytes()));
+ if (confsOrMetricToRetrieve != null &&
+ !confsOrMetricToRetrieve.getFilterList().isEmpty()) {
+ // If confsOrMetricsToRetrive are specified, create a filter list based
+ // on it and family filter.
+ FilterList filter = new FilterList(familyFilter);
+ filter.addFilter(
+ createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve));
+ return filter;
+ } else {
+ // Only the family filter needs to be added.
+ return familyFilter;
+ }
+ }
+
+ /**
+ * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified
+ * value range represented by start and end value and wraps them inside a
+ * filter list. Start and end value should not be null.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param column Column for which single column value filter is to be created.
+ * @param startValue Start value.
+ * @param endValue End value.
+ * @return 2 single column value filters wrapped in a filter list.
+ * @throws IOException if any problem is encountered while encoding value.
+ */
+ public static <T extends BaseTable<T>> FilterList
+ createSingleColValueFiltersByRange(Column<T> column,
+ Object startValue, Object endValue) throws IOException {
+ FilterList list = new FilterList();
+ Filter singleColValFilterStart = createHBaseSingleColValueFilter(
+ column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+ column.getValueConverter().encodeValue(startValue),
+ CompareOp.GREATER_OR_EQUAL, true);
+ list.addFilter(singleColValFilterStart);
+
+ Filter singleColValFilterEnd = createHBaseSingleColValueFilter(
+ column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+ column.getValueConverter().encodeValue(endValue),
+ CompareOp.LESS_OR_EQUAL, true);
+ list.addFilter(singleColValFilterEnd);
+ return list;
+ }
+
+ /**
+ * Creates a HBase {@link SingleColumnValueFilter} with specified column.
+ * @param <T> Describes the type of column prefix.
+ * @param column Column which value to be filtered.
+ * @param value Value to be filtered.
+ * @param op Compare operator
+ * @return a SingleColumnValue Filter
+ * @throws IOException if any exception.
+ */
+ public static <T extends BaseTable<T>> Filter
+ createHBaseSingleColValueFilter(Column<T> column,
+ Object value, CompareOp op) throws IOException {
+ Filter singleColValFilter = createHBaseSingleColValueFilter(
+ column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+ column.getValueConverter().encodeValue(value), op, true);
+ return singleColValFilter;
+ }
+
+ /**
+ * Creates a HBase {@link SingleColumnValueFilter}.
+ *
+ * @param columnFamily Column Family represented as bytes.
+ * @param columnQualifier Column Qualifier represented as bytes.
+ * @param value Value.
+ * @param compareOp Compare operator.
+ * @param filterIfMissing This flag decides if we should filter the row if the
+ * specified column is missing. This is based on the filter's keyMustExist
+ * field.
+ * @return a {@link SingleColumnValueFilter} object
+ * @throws IOException
+ */
+ private static SingleColumnValueFilter createHBaseSingleColValueFilter(
+ byte[] columnFamily, byte[] columnQualifier, byte[] value,
+ CompareOp compareOp, boolean filterIfMissing) throws IOException {
+ SingleColumnValueFilter singleColValFilter =
+ new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp,
+ new BinaryComparator(value));
+ singleColValFilter.setLatestVersionOnly(true);
+ singleColValFilter.setFilterIfMissing(filterIfMissing);
+ return singleColValFilter;
+ }
+
+ /**
+ * Fetch columns from filter list containing exists and multivalue equality
+ * filters. This is done to fetch only required columns from back-end and
+ * then match event filters or relationships in reader.
+ *
+ * @param filterList filter list.
+ * @return set of columns.
+ */
+ public static Set<String> fetchColumnsFromFilterList(
+ TimelineFilterList filterList) {
+ Set<String> strSet = new HashSet<String>();
+ for (TimelineFilter filter : filterList.getFilterList()) {
+ switch(filter.getFilterType()) {
+ case LIST:
+ strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter));
+ break;
+ case KEY_VALUES:
+ strSet.add(((TimelineKeyValuesFilter)filter).getKey());
+ break;
+ case EXISTS:
+ strSet.add(((TimelineExistsFilter)filter).getValue());
+ break;
+ default:
+ LOG.info("Unexpected filter type " + filter.getFilterType());
+ break;
+ }
+ }
+ return strSet;
+ }
+
+ /**
+ * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
+ * while converting different timeline filters(of type {@link TimelineFilter})
+ * into their equivalent HBase filters.
+ *
+ * @param <T> Describes the type of column prefix.
+ * @param colPrefix column prefix which will be used for conversion.
+ * @param filterList timeline filter list which has to be converted.
+ * @return A {@link FilterList} object.
+ * @throws IOException if any problem occurs while creating the filter list.
+ */
+ public static <T extends BaseTable<T>> FilterList createHBaseFilterList(
+ ColumnPrefix<T> colPrefix,
+ TimelineFilterList filterList) throws IOException {
+ FilterList list =
+ new FilterList(getHBaseOperator(filterList.getOperator()));
+ for (TimelineFilter filter : filterList.getFilterList()) {
+ switch(filter.getFilterType()) {
+ case LIST:
+ list.addFilter(createHBaseFilterList(colPrefix,
+ (TimelineFilterList)filter));
+ break;
+ case PREFIX:
+ list.addFilter(createHBaseColQualPrefixFilter(colPrefix,
+ (TimelinePrefixFilter)filter));
+ break;
+ case COMPARE:
+ TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter;
+ list.addFilter(
+ createHBaseSingleColValueFilter(
+ colPrefix.getColumnFamilyBytes(),
+ colPrefix.getColumnPrefixBytes(compareFilter.getKey()),
+ colPrefix.getValueConverter().
+ encodeValue(compareFilter.getValue()),
+ getHBaseCompareOp(compareFilter.getCompareOp()),
+ compareFilter.getKeyMustExist()));
+ break;
+ case KEY_VALUE:
+ TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter;
+ list.addFilter(
+ createHBaseSingleColValueFilter(
+ colPrefix.getColumnFamilyBytes(),
+ colPrefix.getColumnPrefixBytes(kvFilter.getKey()),
+ colPrefix.getValueConverter().encodeValue(kvFilter.getValue()),
+ getHBaseCompareOp(kvFilter.getCompareOp()),
+ kvFilter.getKeyMustExist()));
+ break;
+ default:
+ LOG.info("Unexpected filter type " + filter.getFilterType());
+ break;
+ }
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..1ebfab2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HBase based implementation for {@link TimelineReader}.
+ */
+public class HBaseTimelineReaderImpl
+ extends AbstractService implements TimelineReader {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HBaseTimelineReaderImpl.class);
+
+ private Configuration hbaseConf = null;
+ private Connection conn;
+
+ public HBaseTimelineReaderImpl() {
+ super(HBaseTimelineReaderImpl.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (conn != null) {
+ LOG.info("closing the hbase Connection");
+ conn.close();
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineEntity getEntity(TimelineReaderContext context,
+ TimelineDataToRetrieve dataToRetrieve) throws IOException {
+ TimelineEntityReader reader =
+ TimelineEntityReaderFactory.createSingleEntityReader(context,
+ dataToRetrieve);
+ return reader.readEntity(hbaseConf, conn);
+ }
+
+ @Override
+ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+ TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+ throws IOException {
+ TimelineEntityReader reader =
+ TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
+ filters, dataToRetrieve);
+ return reader.readEntities(hbaseConf, conn);
+ }
+
+ @Override
+ public Set<String> getEntityTypes(TimelineReaderContext context)
+ throws IOException {
+ EntityTypeReader reader = new EntityTypeReader(context);
+ return reader.readEntityTypes(hbaseConf, conn);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..027505b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implements a hbase based backend for storing the timeline entity
+ * information.
+ * It writes to multiple tables at the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+ TimelineWriter {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HBaseTimelineWriterImpl.class);
+
+ private Connection conn;
+ private TypedBufferedMutator<EntityTable> entityTable;
+ private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
+ private TypedBufferedMutator<ApplicationTable> applicationTable;
+ private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+ private TypedBufferedMutator<FlowRunTable> flowRunTable;
+ private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
+
+ /**
+ * Used to convert strings key components to and from storage format.
+ */
+ private final KeyConverter<String> stringKeyConverter =
+ new StringKeyConverter();
+
+ /**
+ * Used to convert Long key components to and from storage format.
+ */
+ private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+ private enum Tables {
+ APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
+ };
+
+ public HBaseTimelineWriterImpl() {
+ super(HBaseTimelineWriterImpl.class.getName());
+ }
+
+ /**
+ * initializes the hbase connection to write to the entity table.
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ Configuration hbaseConf =
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ entityTable = new EntityTableRW().getTableMutator(hbaseConf, conn);
+ appToFlowTable = new AppToFlowTableRW().getTableMutator(hbaseConf, conn);
+ applicationTable =
+ new ApplicationTableRW().getTableMutator(hbaseConf, conn);
+ flowRunTable = new FlowRunTableRW().getTableMutator(hbaseConf, conn);
+ flowActivityTable =
+ new FlowActivityTableRW().getTableMutator(hbaseConf, conn);
+ subApplicationTable =
+ new SubApplicationTableRW().getTableMutator(hbaseConf, conn);
+
+ UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
+ UserGroupInformation.getLoginUser() :
+ UserGroupInformation.getCurrentUser();
+ LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
+ }
+
+ /**
+ * Stores the entire information in TimelineEntities to the timeline store.
+ */
+ @Override
+ public TimelineWriteResponse write(TimelineCollectorContext context,
+ TimelineEntities data, UserGroupInformation callerUgi)
+ throws IOException {
+
+ TimelineWriteResponse putStatus = new TimelineWriteResponse();
+
+ String clusterId = context.getClusterId();
+ String userId = context.getUserId();
+ String flowName = context.getFlowName();
+ String flowVersion = context.getFlowVersion();
+ long flowRunId = context.getFlowRunId();
+ String appId = context.getAppId();
+ String subApplicationUser = callerUgi.getShortUserName();
+
+ // defensive coding to avoid NPE during row key construction
+ if ((flowName == null) || (appId == null) || (clusterId == null)
+ || (userId == null)) {
+ LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId
+ + " userId=" + userId + " clusterId=" + clusterId
+ + " . Not proceeding with writing to hbase");
+ return putStatus;
+ }
+
+ for (TimelineEntity te : data.getEntities()) {
+
+ // a set can have at most 1 null
+ if (te == null) {
+ continue;
+ }
+
+ // if the entity is the application, the destination is the application
+ // table
+ boolean isApplication = ApplicationEntity.isApplicationEntity(te);
+ byte[] rowKey;
+ if (isApplication) {
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+ appId);
+ rowKey = applicationRowKey.getRowKey();
+ store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
+ } else {
+ EntityRowKey entityRowKey =
+ new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+ te.getType(), te.getIdPrefix(), te.getId());
+ rowKey = entityRowKey.getRowKey();
+ store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
+ }
+
+ if (!isApplication && !userId.equals(subApplicationUser)) {
+ SubApplicationRowKey subApplicationRowKey =
+ new SubApplicationRowKey(subApplicationUser, clusterId,
+ te.getType(), te.getIdPrefix(), te.getId(), userId);
+ rowKey = subApplicationRowKey.getRowKey();
+ store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
+ }
+
+ if (isApplication) {
+ TimelineEvent event =
+ ApplicationEntity.getApplicationEvent(te,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ FlowRunRowKey flowRunRowKey =
+ new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+ if (event != null) {
+ onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
+ flowVersion, te, event.getTimestamp());
+ }
+ // if it's an application entity, store metrics
+ storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
+ // if application has finished, store it's finish time and write final
+ // values of all metrics
+ event = ApplicationEntity.getApplicationEvent(te,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ if (event != null) {
+ onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+ event.getTimestamp());
+ }
+ }
+ }
+ return putStatus;
+ }
+
+ private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+ String clusterId, String appId, String userId, String flowVersion,
+ TimelineEntity te, long appCreatedTimeStamp)
+ throws IOException {
+
+ String flowName = flowRunRowKey.getFlowName();
+ Long flowRunId = flowRunRowKey.getFlowRunId();
+
+ // store in App to flow table
+ AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
+ byte[] rowKey = appToFlowRowKey.getRowKey();
+ ColumnRWHelper.store(rowKey, appToFlowTable,
+ AppToFlowColumnPrefix.FLOW_NAME, clusterId, null, flowName);
+ ColumnRWHelper.store(rowKey, appToFlowTable,
+ AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId, null, flowRunId);
+ ColumnRWHelper.store(rowKey, appToFlowTable, AppToFlowColumnPrefix.USER_ID,
+ clusterId, null, userId);
+
+ // store in flow run table
+ storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
+
+ // store in flow activity table
+ byte[] flowActivityRowKeyBytes =
+ new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+ appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+ .getRowKey();
+ byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+ ColumnRWHelper.store(flowActivityRowKeyBytes, flowActivityTable,
+ FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+
+ /*
+ * updates the {@link FlowRunTable} with Application Created information
+ */
+ private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te) throws IOException {
+ byte[] rowKey = flowRunRowKey.getRowKey();
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MIN_START_TIME,
+ null, te.getCreatedTime(),
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+
+
+ /*
+ * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+ * application has finished
+ */
+ private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+ String flowVersion, String appId, TimelineEntity te,
+ long appFinishedTimeStamp) throws IOException {
+ // store in flow run table
+ storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+ appFinishedTimeStamp);
+
+ // indicate in the flow activity table that the app has finished
+ byte[] rowKey =
+ new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+ appFinishedTimeStamp, flowRunRowKey.getUserId(),
+ flowRunRowKey.getFlowName()).getRowKey();
+ byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+ ColumnRWHelper.store(rowKey, flowActivityTable,
+ FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+
+ /*
+ * Update the {@link FlowRunTable} with Application Finished information
+ */
+ private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te, long appFinishedTimeStamp)
+ throws IOException {
+ byte[] rowKey = flowRunRowKey.getRowKey();
+ Attribute attributeAppId =
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MAX_END_TIME,
+ null, appFinishedTimeStamp, attributeAppId);
+
+ // store the final value of metrics since application has finished
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ storeFlowMetrics(rowKey, metrics, attributeAppId,
+ AggregationOperation.SUM_FINAL.getAttribute());
+ }
+ }
+
+ /*
+ * Updates the {@link FlowRunTable} with Application Metrics
+ */
+ private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+ String appId, TimelineEntity te) throws IOException {
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ byte[] rowKey = flowRunRowKey.getRowKey();
+ storeFlowMetrics(rowKey, metrics,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+ AggregationOperation.SUM.getAttribute());
+ }
+ }
+
+ private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+ Attribute... attributes) throws IOException {
+ for (TimelineMetric metric : metrics) {
+ byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
+ Map<Long, Number> timeseries = metric.getValues();
+ for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+ Long timestamp = timeseriesEntry.getKey();
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumnPrefix.METRIC,
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+ attributes);
+ }
+ }
+ }
+
+ /**
+ * Stores the Relations from the {@linkplain TimelineEntity} object.
+ */
+ private <T extends BaseTable<T>> void storeRelations(byte[] rowKey,
+ Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
+ TypedBufferedMutator<T> table) throws IOException {
+ if (connectedEntities != null) {
+ for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+ .entrySet()) {
+ // id3?id4?id5
+ String compoundValue =
+ Separator.VALUES.joinEncoded(connectedEntity.getValue());
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
+ stringKeyConverter.encode(connectedEntity.getKey()),
+ null, compoundValue);
+ }
+ }
+ }
+
+ /**
+ * Stores information from the {@linkplain TimelineEntity} object.
+ */
+ private void store(byte[] rowKey, TimelineEntity te,
+ String flowVersion,
+ Tables table) throws IOException {
+ switch (table) {
+ case APPLICATION_TABLE:
+ ColumnRWHelper.store(rowKey, applicationTable,
+ ApplicationColumn.ID, null, te.getId());
+ ColumnRWHelper.store(rowKey, applicationTable,
+ ApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
+ ColumnRWHelper.store(rowKey, applicationTable,
+ ApplicationColumn.FLOW_VERSION, null, flowVersion);
+ storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
+ applicationTable);
+ storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
+ applicationTable);
+ storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
+ applicationTable);
+ storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
+ applicationTable);
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
+ ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+ storeRelations(rowKey, te.getRelatesToEntities(),
+ ApplicationColumnPrefix.RELATES_TO, applicationTable);
+ break;
+ case ENTITY_TABLE:
+ ColumnRWHelper.store(rowKey, entityTable,
+ EntityColumn.ID, null, te.getId());
+ ColumnRWHelper.store(rowKey, entityTable,
+ EntityColumn.TYPE, null, te.getType());
+ ColumnRWHelper.store(rowKey, entityTable,
+ EntityColumn.CREATED_TIME, null, te.getCreatedTime());
+ ColumnRWHelper.store(rowKey, entityTable,
+ EntityColumn.FLOW_VERSION, null, flowVersion);
+ storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
+ entityTable);
+ storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
+ entityTable);
+ storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
+ entityTable);
+ storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
+ entityTable);
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
+ EntityColumnPrefix.IS_RELATED_TO, entityTable);
+ storeRelations(rowKey, te.getRelatesToEntities(),
+ EntityColumnPrefix.RELATES_TO, entityTable);
+ break;
+ case SUBAPPLICATION_TABLE:
+ ColumnRWHelper.store(rowKey, subApplicationTable, SubApplicationColumn.ID,
+ null, te.getId());
+ ColumnRWHelper.store(rowKey, subApplicationTable,
+ SubApplicationColumn.TYPE, null, te.getType());
+ ColumnRWHelper.store(rowKey, subApplicationTable,
+ SubApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
+ ColumnRWHelper.store(rowKey, subApplicationTable,
+ SubApplicationColumn.FLOW_VERSION, null, flowVersion);
+ storeInfo(rowKey, te.getInfo(), flowVersion,
+ SubApplicationColumnPrefix.INFO, subApplicationTable);
+ storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
+ subApplicationTable);
+ storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
+ subApplicationTable);
+ storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
+ subApplicationTable);
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
+ SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
+ storeRelations(rowKey, te.getRelatesToEntities(),
+ SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
+ break;
+ default:
+ LOG.info("Invalid table name provided.");
+ break;
+ }
+ }
+
+ /**
+ * stores the info information from {@linkplain TimelineEntity}.
+ */
+ private <T extends BaseTable<T>> void storeInfo(byte[] rowKey,
+ Map<String, Object> info, String flowVersion,
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T > table)
+ throws IOException {
+ if (info != null) {
+ for (Map.Entry<String, Object> entry : info.entrySet()) {
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
+ stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * stores the config information from {@linkplain TimelineEntity}.
+ */
+ private <T extends BaseTable<T>> void storeConfig(
+ byte[] rowKey, Map<String, String> config,
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+ throws IOException {
+ if (config != null) {
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ byte[] configKey = stringKeyConverter.encode(entry.getKey());
+ ColumnRWHelper.store(rowKey, table, columnPrefix, configKey,
+ null, entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * stores the {@linkplain TimelineMetric} information from the
+ * {@linkplain TimelineEvent} object.
+ */
+ private <T extends BaseTable<T>> void storeMetrics(
+ byte[] rowKey, Set<TimelineMetric> metrics,
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+ throws IOException {
+ if (metrics != null) {
+ for (TimelineMetric metric : metrics) {
+ byte[] metricColumnQualifier =
+ stringKeyConverter.encode(metric.getId());
+ Map<Long, Number> timeseries = metric.getValues();
+ for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+ Long timestamp = timeseriesEntry.getKey();
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores the events from the {@linkplain TimelineEvent} object.
+ */
+ private <T extends BaseTable<T>> void storeEvents(
+ byte[] rowKey, Set<TimelineEvent> events,
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+ throws IOException {
+ if (events != null) {
+ for (TimelineEvent event : events) {
+ if (event != null) {
+ String eventId = event.getId();
+ if (eventId != null) {
+ long eventTimestamp = event.getTimestamp();
+ // if the timestamp is not set, use the current timestamp
+ if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
+ LOG.warn("timestamp is not set for event " + eventId +
+ "! Using the current timestamp");
+ eventTimestamp = System.currentTimeMillis();
+ }
+ Map<String, Object> eventInfo = event.getInfo();
+ if ((eventInfo == null) || (eventInfo.size() == 0)) {
+ byte[] columnQualifierBytes =
+ new EventColumnName(eventId, eventTimestamp, null)
+ .getColumnQualifier();
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
+ columnQualifierBytes, null, Separator.EMPTY_BYTES);
+ } else {
+ for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
+ // eventId=infoKey
+ byte[] columnQualifierBytes =
+ new EventColumnName(eventId, eventTimestamp, info.getKey())
+ .getColumnQualifier();
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
+ columnQualifierBytes, null, info.getValue());
+ } // for info: eventInfo
+ }
+ }
+ }
+ } // event : events
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage
+ * .TimelineWriter#aggregate
+ * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+ * org.apache
+ * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+ */
+ @Override
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+ * ()
+ */
+ @Override
+ public void flush() throws IOException {
+ // flush all buffered mutators
+ entityTable.flush();
+ appToFlowTable.flush();
+ applicationTable.flush();
+ flowRunTable.flush();
+ flowActivityTable.flush();
+ subApplicationTable.flush();
+ }
+
+ /**
+ * close the hbase connections The close APIs perform flushing and release any
+ * resources held.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ if (entityTable != null) {
+ LOG.info("closing the entity table");
+ // The close API performs flushing and releases any resources held
+ entityTable.close();
+ }
+ if (appToFlowTable != null) {
+ LOG.info("closing the app_flow table");
+ // The close API performs flushing and releases any resources held
+ appToFlowTable.close();
+ }
+ if (applicationTable != null) {
+ LOG.info("closing the application table");
+ applicationTable.close();
+ }
+ if (flowRunTable != null) {
+ LOG.info("closing the flow run table");
+ // The close API performs flushing and releases any resources held
+ flowRunTable.close();
+ }
+ if (flowActivityTable != null) {
+ LOG.info("closing the flowActivityTable table");
+ // The close API performs flushing and releases any resources held
+ flowActivityTable.close();
+ }
+ if (subApplicationTable != null) {
+ subApplicationTable.close();
+ }
+ if (conn != null) {
+ LOG.info("closing the hbase Connection");
+ conn.close();
+ }
+ super.serviceStop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
new file mode 100644
index 0000000..e9e4770
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This creates the schema for a hbase based backend for storing application
+ * timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineSchemaCreator {
+ private TimelineSchemaCreator() {
+ }
+
+ final static String NAME = TimelineSchemaCreator.class.getSimpleName();
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TimelineSchemaCreator.class);
+ private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
+ private static final String APP_METRICS_TTL_OPTION_SHORT = "ma";
+ private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa";
+ private static final String APP_TABLE_NAME_SHORT = "a";
+ private static final String SUB_APP_TABLE_NAME_SHORT = "sa";
+ private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
+ private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me";
+ private static final String ENTITY_TABLE_NAME_SHORT = "e";
+ private static final String HELP_SHORT = "h";
+ private static final String CREATE_TABLES_SHORT = "c";
+
+ public static void main(String[] args) throws Exception {
+
+ LOG.info("Starting the schema creation");
+ Configuration hbaseConf =
+ HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(
+ new YarnConfiguration());
+ // Grab input args and allow for -Dxyz style arguments
+ String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
+ .getRemainingArgs();
+
+ // Grab the arguments we're looking for.
+ CommandLine commandLine = parseArgs(otherArgs);
+
+ if (commandLine.hasOption(HELP_SHORT)) {
+ // -help option has the highest precedence
+ printUsage();
+ } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) {
+ // Grab the entityTableName argument
+ String entityTableName = commandLine.getOptionValue(
+ ENTITY_TABLE_NAME_SHORT);
+ if (StringUtils.isNotBlank(entityTableName)) {
+ hbaseConf.set(EntityTableRW.TABLE_NAME_CONF_NAME, entityTableName);
+ }
+ // Grab the entity metrics TTL
+ String entityTableMetricsTTL = commandLine.getOptionValue(
+ ENTITY_METRICS_TTL_OPTION_SHORT);
+ if (StringUtils.isNotBlank(entityTableMetricsTTL)) {
+ int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL);
+ new EntityTableRW().setMetricsTTL(entityMetricsTTL, hbaseConf);
+ }
+ // Grab the appToflowTableName argument
+ String appToflowTableName = commandLine.getOptionValue(
+ APP_TO_FLOW_TABLE_NAME_SHORT);
+ if (StringUtils.isNotBlank(appToflowTableName)) {
+ hbaseConf.set(
+ AppToFlowTableRW.TABLE_NAME_CONF_NAME, appToflowTableName);
+ }
+ // Grab the applicationTableName argument
+ String applicationTableName = commandLine.getOptionValue(
+ APP_TABLE_NAME_SHORT);
+ if (StringUtils.isNotBlank(applicationTableName)) {
+ hbaseConf.set(ApplicationTableRW.TABLE_NAME_CONF_NAME,
+ applicationTableName);
+ }
+ // Grab the application metrics TTL
+ String applicationTableMetricsTTL = commandLine.getOptionValue(
+ APP_METRICS_TTL_OPTION_SHORT);
+ if (StringUtils.isNotBlank(applicationTableMetricsTTL)) {
+ int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL);
+ new ApplicationTableRW().setMetricsTTL(appMetricsTTL, hbaseConf);
+ }
+
+ // Grab the subApplicationTableName argument
+ String subApplicationTableName = commandLine.getOptionValue(
+ SUB_APP_TABLE_NAME_SHORT);
+ if (StringUtils.isNotBlank(subApplicationTableName)) {
+ hbaseConf.set(SubApplicationTableRW.TABLE_NAME_CONF_NAME,
+ subApplicationTableName);
+ }
+ // Grab the subApplication metrics TTL
+ String subApplicationTableMetricsTTL = commandLine
+ .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT);
+ if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) {
+ int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL);
+ new SubApplicationTableRW().setMetricsTTL(subAppMetricsTTL, hbaseConf);
+ }
+
+ // create all table schemas in hbase
+ final boolean skipExisting = commandLine.hasOption(
+ SKIP_EXISTING_TABLE_OPTION_SHORT);
+ createAllSchemas(hbaseConf, skipExisting);
+ } else {
+ // print usage information if -create is not specified
+ printUsage();
+ }
+ }
+
+ /**
+ * Parse command-line arguments.
+ *
+ * @param args
+ * command line arguments passed to program.
+ * @return parsed command line.
+ * @throws ParseException
+ */
+ private static CommandLine parseArgs(String[] args) throws ParseException {
+ Options options = new Options();
+
+ // Input
+ Option o = new Option(HELP_SHORT, "help", false, "print help information");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(CREATE_TABLES_SHORT, "create", false,
+ "a mandatory option to create hbase tables");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true,
+ "entity table name");
+ o.setArgName("entityTableName");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true,
+ "TTL for metrics column family");
+ o.setArgName("entityMetricsTTL");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true,
+ "app to flow table name");
+ o.setArgName("appToflowTableName");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true,
+ "application table name");
+ o.setArgName("applicationTableName");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true,
+ "TTL for metrics column family");
+ o.setArgName("applicationMetricsTTL");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true,
+ "subApplication table name");
+ o.setArgName("subApplicationTableName");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL",
+ true, "TTL for metrics column family");
+ o.setArgName("subApplicationMetricsTTL");
+ o.setRequired(false);
+ options.addOption(o);
+
+ // Options without an argument
+ // No need to set arg name since we do not need an argument here
+ o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
+ false, "skip existing Hbase tables and continue to create new tables");
+ o.setRequired(false);
+ options.addOption(o);
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args);
+ } catch (Exception e) {
+ LOG.error("ERROR: " + e.getMessage() + "\n");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(NAME + " ", options, true);
+ System.exit(-1);
+ }
+
+ return commandLine;
+ }
+
+ private static void printUsage() {
+ StringBuilder usage = new StringBuilder("Command Usage: \n");
+ usage.append("TimelineSchemaCreator [-help] Display help info" +
+ " for all commands. Or\n");
+ usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" +
+ " Create hbase tables.\n\n");
+ usage.append("The Optional options for creating tables include: \n");
+ usage.append("[-entityTableName <Entity Table Name>] " +
+ "The name of the Entity table\n");
+ usage.append("[-entityMetricsTTL <Entity Table Metrics TTL>]" +
+ " TTL for metrics in the Entity table\n");
+ usage.append("[-appToflowTableName <AppToflow Table Name>]" +
+ " The name of the AppToFlow table\n");
+ usage.append("[-applicationTableName <Application Table Name>]" +
+ " The name of the Application table\n");
+ usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" +
+ " TTL for metrics in the Application table\n");
+ usage.append("[-subApplicationTableName <SubApplication Table Name>]" +
+ " The name of the SubApplication table\n");
+ usage.append("[-subApplicationMetricsTTL " +
+ " <SubApplication Table Metrics TTL>]" +
+ " TTL for metrics in the SubApplication table\n");
+ usage.append("[-skipExistingTable] Whether to skip existing" +
+ " hbase tables\n");
+ System.out.println(usage.toString());
+ }
+
+ /**
+ * Create all table schemas and log success or exception if failed.
+ * @param hbaseConf the hbase configuration to create tables with
+ * @param skipExisting whether to skip existing hbase tables
+ */
+ private static void createAllSchemas(Configuration hbaseConf,
+ boolean skipExisting) {
+ List<Exception> exceptions = new ArrayList<>();
+ try {
+ if (skipExisting) {
+ LOG.info("Will skip existing tables and continue on htable creation "
+ + "exceptions!");
+ }
+ createAllTables(hbaseConf, skipExisting);
+ LOG.info("Successfully created HBase schema. ");
+ } catch (IOException e) {
+ LOG.error("Error in creating hbase tables: ", e);
+ exceptions.add(e);
+ }
+
+ if (exceptions.size() > 0) {
+ LOG.warn("Schema creation finished with the following exceptions");
+ for (Exception e : exceptions) {
+ LOG.warn(e.getMessage());
+ }
+ System.exit(-1);
+ } else {
+ LOG.info("Schema creation finished successfully");
+ }
+ }
+
+ @VisibleForTesting
+ public static void createAllTables(Configuration hbaseConf,
+ boolean skipExisting) throws IOException {
+
+ Connection conn = null;
+ try {
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin();
+ if (admin == null) {
+ throw new IOException("Cannot create table since admin is null");
+ }
+ try {
+ new EntityTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new AppToFlowTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new ApplicationTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new FlowRunTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new FlowActivityTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new SubApplicationTableRW().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
new file mode 100644
index 0000000..808994e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the Application Table.
+ */
+public class ApplicationTableRW extends BaseTableRW<ApplicationTable> {
+ /** application prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application";
+
+ /** config param name that specifies the application table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /**
+ * config param name that specifies the TTL for metrics column family in
+ * application table.
+ */
+ private static final String METRICS_TTL_CONF_NAME = PREFIX
+ + ".table.metrics.ttl";
+
+ /**
+ * config param name that specifies max-versions for metrics column family in
+ * entity table.
+ */
+ private static final String METRICS_MAX_VERSIONS =
+ PREFIX + ".table.metrics.max-versions";
+
+ /** default value for application table name. */
+ private static final String DEFAULT_TABLE_NAME =
+ "timelineservice.application";
+
+ /** default TTL is 30 days for metrics timeseries. */
+ private static final int DEFAULT_METRICS_TTL = 2592000;
+
+ /** default max number of versions. */
+ private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ApplicationTableRW.class);
+
+ public ApplicationTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor applicationTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ applicationTableDescp.addFamily(infoCF);
+
+ HColumnDescriptor configCF =
+ new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes());
+ configCF.setBloomFilterType(BloomType.ROWCOL);
+ configCF.setBlockCacheEnabled(true);
+ applicationTableDescp.addFamily(configCF);
+
+ HColumnDescriptor metricsCF =
+ new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes());
+ applicationTableDescp.addFamily(metricsCF);
+ metricsCF.setBlockCacheEnabled(true);
+ // always keep 1 version (the latest)
+ metricsCF.setMinVersions(1);
+ metricsCF.setMaxVersions(
+ hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
+ metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+ DEFAULT_METRICS_TTL));
+ applicationTableDescp.setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(applicationTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+
+ /**
+ * @param metricsTTL time to live parameter for the metrics in this table.
+ * @param hbaseConf configuration in which to set the metrics TTL config
+ * variable.
+ */
+ public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+ hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
new file mode 100644
index 0000000..03f508f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.application
+ * contains classes related to implementation for application table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
new file mode 100644
index 0000000..6460203
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Create, read and write to the AppToFlow Table.
+ */
+public class AppToFlowTableRW extends BaseTableRW<AppToFlowTable> {
+ /** app_flow prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+ /** config param name that specifies the app_flow table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for app_flow table name. */
+ private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AppToFlowTableRW.class);
+
+ public AppToFlowTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor mappCF =
+ new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+ mappCF.setBloomFilterType(BloomType.ROWCOL);
+ appToFlowTableDescp.addFamily(mappCF);
+
+ appToFlowTableDescp
+ .setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(appToFlowTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
new file mode 100644
index 0000000..f01d982
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow
+ * contains classes related to implementation for app to flow table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
new file mode 100644
index 0000000..12ebce4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Implements behavior common to tables used in the timeline service storage. It
+ * is thread-safe, and can be used by multiple threads concurrently.
+ *
+ * @param <T> reference to the table instance class itself for type safety.
+ */
+public abstract class BaseTableRW<T extends BaseTable<T>> {
+
+ /**
+ * Name of config variable that is used to point to this table.
+ */
+ private final String tableNameConfName;
+
+ /**
+ * Unless the configuration overrides, this will be the default name for the
+ * table when it is created.
+ */
+ private final String defaultTableName;
+
+ /**
+ * @param tableNameConfName name of config variable that is used to point to
+ * this table.
+ * @param defaultTableName Default table name if table from config is not
+ * found.
+ */
+ protected BaseTableRW(String tableNameConfName, String defaultTableName) {
+ this.tableNameConfName = tableNameConfName;
+ this.defaultTableName = defaultTableName;
+ }
+
+ /**
+ * Used to create a type-safe mutator for this table.
+ *
+ * @param hbaseConf used to read table name.
+ * @param conn used to create a table from.
+ * @return a type safe {@link BufferedMutator} for the entity table.
+ * @throws IOException if any exception occurs while creating mutator for the
+ * table.
+ */
+ public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
+ Connection conn) throws IOException {
+
+ TableName tableName = this.getTableName(hbaseConf);
+
+ // Plain buffered mutator
+ BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName);
+
+ // Now make this thing type safe.
+ // This is how service initialization should hang on to this variable, with
+ // the proper type
+ TypedBufferedMutator<T> table =
+ new TypedBufferedMutator<T>(bufferedMutator);
+
+ return table;
+ }
+
+ /**
+ * @param hbaseConf used to read settings that override defaults
+ * @param conn used to create table from
+ * @param scan that specifies what you want to read from this table.
+ * @return scanner for the table.
+ * @throws IOException if any exception occurs while getting the scanner.
+ */
+ public ResultScanner getResultScanner(Configuration hbaseConf,
+ Connection conn, Scan scan) throws IOException {
+ Table table = conn.getTable(getTableName(hbaseConf));
+ return table.getScanner(scan);
+ }
+
+ /**
+ *
+ * @param hbaseConf used to read settings that override defaults
+ * @param conn used to create table from
+ * @param get that specifies what single row you want to get from this table
+ * @return result of get operation
+ * @throws IOException if any exception occurs while getting the result.
+ */
+ public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+ throws IOException {
+ Table table = conn.getTable(getTableName(hbaseConf));
+ return table.get(get);
+ }
+
+ /**
+ * Get the table name for the input table.
+ *
+ * @param conf HBase configuration from which table name will be fetched.
+ * @param tableName name of the table to be fetched
+ * @return A {@link TableName} object.
+ */
+ public static TableName getTableName(Configuration conf, String tableName) {
+ String tableSchemaPrefix = conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX);
+ return TableName.valueOf(tableSchemaPrefix + tableName);
+ }
+
+ /**
+ * Get the table name for this table.
+ *
+ * @param conf HBase configuration from which table name will be fetched.
+ * @return A {@link TableName} object.
+ */
+ public TableName getTableName(Configuration conf) {
+ String tableName = conf.get(tableNameConfName, defaultTableName);
+ return getTableName(conf, tableName);
+ }
+
+ /**
+ * Get the table name based on the input config parameters.
+ *
+ * @param conf HBase configuration from which table name will be fetched.
+ * @param tableNameInConf the table name parameter in conf.
+ * @param defaultTableName the default table name.
+ * @return A {@link TableName} object.
+ */
+ public static TableName getTableName(Configuration conf,
+ String tableNameInConf, String defaultTableName) {
+ String tableName = conf.get(tableNameInConf, defaultTableName);
+ return getTableName(conf, tableName);
+ }
+
+ /**
+ * Used to create the table in HBase. Should be called only once (per HBase
+ * instance).
+ *
+ * @param admin Used for doing HBase table operations.
+ * @param hbaseConf Hbase configuration.
+ * @throws IOException if any exception occurs while creating the table.
+ */
+ public abstract void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException;
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org