You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2018/02/18 08:09:49 UTC

[17/18] hadoop git commit: YARN-7919. Refactor timelineservice-hbase module into submodules. Contributed by Haibo Chen.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..2b98eec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TimelineFilterUtils.class);
+
+  private TimelineFilterUtils() {
+  }
+
+  /**
+   * Returns the equivalent HBase filter list's {@link Operator}.
+   *
+   * @param op timeline filter list operator.
+   * @return HBase filter list's Operator.
+   */
+  private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+    switch (op) {
+    case AND:
+      return Operator.MUST_PASS_ALL;
+    case OR:
+      return Operator.MUST_PASS_ONE;
+    default:
+      throw new IllegalArgumentException("Invalid operator");
+    }
+  }
+
+  /**
+   * Returns the equivalent HBase compare filter's {@link CompareOp}.
+   *
+   * @param op timeline compare op.
+   * @return HBase compare filter's CompareOp.
+   */
+  private static CompareOp getHBaseCompareOp(
+      TimelineCompareOp op) {
+    switch (op) {
+    case LESS_THAN:
+      return CompareOp.LESS;
+    case LESS_OR_EQUAL:
+      return CompareOp.LESS_OR_EQUAL;
+    case EQUAL:
+      return CompareOp.EQUAL;
+    case NOT_EQUAL:
+      return CompareOp.NOT_EQUAL;
+    case GREATER_OR_EQUAL:
+      return CompareOp.GREATER_OR_EQUAL;
+    case GREATER_THAN:
+      return CompareOp.GREATER;
+    default:
+      throw new IllegalArgumentException("Invalid compare operator");
+    }
+  }
+
+  /**
+   * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+   * {@link QualifierFilter}.
+   * @param colPrefix
+   * @param filter
+   * @return a {@link QualifierFilter} object
+   */
+  private static <T extends BaseTable<T>> Filter createHBaseColQualPrefixFilter(
+      ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+    return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+        new BinaryPrefixComparator(
+            colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+  }
+
+  /**
+   * Create a HBase {@link QualifierFilter} for the passed column prefix and
+   * compare op.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param compareOp compare op.
+   * @param columnPrefix column prefix.
+   * @return a column qualifier filter.
+   */
+  public static <T extends BaseTable<T>> Filter createHBaseQualifierFilter(
+      CompareOp compareOp, ColumnPrefix<T> columnPrefix) {
+    return new QualifierFilter(compareOp,
+        new BinaryPrefixComparator(
+            columnPrefix.getColumnPrefixBytes("")));
+  }
+
+  /**
+   * Create filters for confs or metrics to retrieve. This list includes a
+   * configs/metrics family filter and relevant filters for confs/metrics to
+   * retrieve, if present.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param confsOrMetricToRetrieve configs/metrics to retrieve.
+   * @param columnFamily config or metric column family.
+   * @param columnPrefix config or metric column prefix.
+   * @return a filter list.
+   * @throws IOException if any problem occurs while creating the filters.
+   */
+  public static <T extends BaseTable<T>> Filter
+      createFilterForConfsOrMetricsToRetrieve(
+      TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily,
+      ColumnPrefix<T> columnPrefix) throws IOException {
+    Filter familyFilter = new FamilyFilter(CompareOp.EQUAL,
+        new BinaryComparator(columnFamily.getBytes()));
+    if (confsOrMetricToRetrieve != null &&
+        !confsOrMetricToRetrieve.getFilterList().isEmpty()) {
+      // If confsOrMetricsToRetrive are specified, create a filter list based
+      // on it and family filter.
+      FilterList filter = new FilterList(familyFilter);
+      filter.addFilter(
+          createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve));
+      return filter;
+    } else {
+      // Only the family filter needs to be added.
+      return familyFilter;
+    }
+  }
+
+  /**
+   * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified
+   * value range represented by start and end value and wraps them inside a
+   * filter list. Start and end value should not be null.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param column Column for which single column value filter is to be created.
+   * @param startValue Start value.
+   * @param endValue End value.
+   * @return 2 single column value filters wrapped in a filter list.
+   * @throws IOException if any problem is encountered while encoding value.
+   */
+  public static <T extends BaseTable<T>> FilterList
+      createSingleColValueFiltersByRange(Column<T> column,
+          Object startValue, Object endValue) throws IOException {
+    FilterList list = new FilterList();
+    Filter singleColValFilterStart = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(startValue),
+        CompareOp.GREATER_OR_EQUAL, true);
+    list.addFilter(singleColValFilterStart);
+
+    Filter singleColValFilterEnd = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(endValue),
+        CompareOp.LESS_OR_EQUAL, true);
+    list.addFilter(singleColValFilterEnd);
+    return list;
+  }
+
+  /**
+   * Creates a HBase {@link SingleColumnValueFilter} with specified column.
+   * @param <T> Describes the type of column prefix.
+   * @param column Column which value to be filtered.
+   * @param value Value to be filtered.
+   * @param op Compare operator
+   * @return a SingleColumnValue Filter
+   * @throws IOException if any exception.
+   */
+  public static <T extends BaseTable<T>> Filter
+      createHBaseSingleColValueFilter(Column<T> column,
+          Object value, CompareOp op) throws IOException {
+    Filter singleColValFilter = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(value), op, true);
+    return singleColValFilter;
+  }
+
+  /**
+   * Creates a HBase {@link SingleColumnValueFilter}.
+   *
+   * @param columnFamily Column Family represented as bytes.
+   * @param columnQualifier Column Qualifier represented as bytes.
+   * @param value Value.
+   * @param compareOp Compare operator.
+   * @param filterIfMissing This flag decides if we should filter the row if the
+   *     specified column is missing. This is based on the filter's keyMustExist
+   *     field.
+   * @return a {@link SingleColumnValueFilter} object
+   * @throws IOException
+   */
+  private static SingleColumnValueFilter createHBaseSingleColValueFilter(
+      byte[] columnFamily, byte[] columnQualifier, byte[] value,
+      CompareOp compareOp, boolean filterIfMissing) throws IOException {
+    SingleColumnValueFilter singleColValFilter =
+        new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp,
+        new BinaryComparator(value));
+    singleColValFilter.setLatestVersionOnly(true);
+    singleColValFilter.setFilterIfMissing(filterIfMissing);
+    return singleColValFilter;
+  }
+
+  /**
+   * Fetch columns from filter list containing exists and multivalue equality
+   * filters. This is done to fetch only required columns from back-end and
+   * then match event filters or relationships in reader.
+   *
+   * @param filterList filter list.
+   * @return set of columns.
+   */
+  public static Set<String> fetchColumnsFromFilterList(
+      TimelineFilterList filterList) {
+    Set<String> strSet = new HashSet<String>();
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter));
+        break;
+      case KEY_VALUES:
+        strSet.add(((TimelineKeyValuesFilter)filter).getKey());
+        break;
+      case EXISTS:
+        strSet.add(((TimelineExistsFilter)filter).getValue());
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return strSet;
+  }
+
+  /**
+   * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
+   * while converting different timeline filters(of type {@link TimelineFilter})
+   * into their equivalent HBase filters.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param colPrefix column prefix which will be used for conversion.
+   * @param filterList timeline filter list which has to be converted.
+   * @return A {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating the filter list.
+   */
+  public static <T extends BaseTable<T>> FilterList createHBaseFilterList(
+      ColumnPrefix<T> colPrefix,
+      TimelineFilterList filterList) throws IOException {
+    FilterList list =
+        new FilterList(getHBaseOperator(filterList.getOperator()));
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        list.addFilter(createHBaseFilterList(colPrefix,
+            (TimelineFilterList)filter));
+        break;
+      case PREFIX:
+        list.addFilter(createHBaseColQualPrefixFilter(colPrefix,
+            (TimelinePrefixFilter)filter));
+        break;
+      case COMPARE:
+        TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(compareFilter.getKey()),
+                colPrefix.getValueConverter().
+                    encodeValue(compareFilter.getValue()),
+                getHBaseCompareOp(compareFilter.getCompareOp()),
+                compareFilter.getKeyMustExist()));
+        break;
+      case KEY_VALUE:
+        TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(kvFilter.getKey()),
+                colPrefix.getValueConverter().encodeValue(kvFilter.getValue()),
+                getHBaseCompareOp(kvFilter.getCompareOp()),
+                kvFilter.getKeyMustExist()));
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..1ebfab2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HBase based implementation for {@link TimelineReader}.
+ */
+public class HBaseTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HBaseTimelineReaderImpl.class);
+
+  private Configuration hbaseConf = null;
+  private Connection conn;
+
+  public HBaseTimelineReaderImpl() {
+    super(HBaseTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createSingleEntityReader(context,
+            dataToRetrieve);
+    return reader.readEntity(hbaseConf, conn);
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
+            filters, dataToRetrieve);
+    return reader.readEntities(hbaseConf, conn);
+  }
+
+  @Override
+  public Set<String> getEntityTypes(TimelineReaderContext context)
+      throws IOException {
+    EntityTypeReader reader = new EntityTypeReader(context);
+    return reader.readEntityTypes(hbaseConf, conn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..027505b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import  org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implements a hbase based backend for storing the timeline entity
+ * information.
+ * It writes to multiple tables at the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+    TimelineWriter {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HBaseTimelineWriterImpl.class);
+
+  private Connection conn;
+  private TypedBufferedMutator<EntityTable> entityTable;
+  private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
+  private TypedBufferedMutator<ApplicationTable> applicationTable;
+  private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+  private TypedBufferedMutator<FlowRunTable> flowRunTable;
+  private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
+
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+  private enum Tables {
+    APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
+  };
+
+  public HBaseTimelineWriterImpl() {
+    super(HBaseTimelineWriterImpl.class.getName());
+  }
+
+  /**
+   * initializes the hbase connection to write to the entity table.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    Configuration hbaseConf =
+        HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    entityTable = new EntityTableRW().getTableMutator(hbaseConf, conn);
+    appToFlowTable = new AppToFlowTableRW().getTableMutator(hbaseConf, conn);
+    applicationTable =
+        new ApplicationTableRW().getTableMutator(hbaseConf, conn);
+    flowRunTable = new FlowRunTableRW().getTableMutator(hbaseConf, conn);
+    flowActivityTable =
+        new FlowActivityTableRW().getTableMutator(hbaseConf, conn);
+    subApplicationTable =
+        new SubApplicationTableRW().getTableMutator(hbaseConf, conn);
+
+    UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
+        UserGroupInformation.getLoginUser() :
+        UserGroupInformation.getCurrentUser();
+    LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
+  }
+
+  /**
+   * Stores the entire information in TimelineEntities to the timeline store.
+   */
+  @Override
+  public TimelineWriteResponse write(TimelineCollectorContext context,
+      TimelineEntities data, UserGroupInformation callerUgi)
+      throws IOException {
+
+    TimelineWriteResponse putStatus = new TimelineWriteResponse();
+
+    String clusterId = context.getClusterId();
+    String userId = context.getUserId();
+    String flowName = context.getFlowName();
+    String flowVersion = context.getFlowVersion();
+    long flowRunId = context.getFlowRunId();
+    String appId = context.getAppId();
+    String subApplicationUser = callerUgi.getShortUserName();
+
+    // defensive coding to avoid NPE during row key construction
+    if ((flowName == null) || (appId == null) || (clusterId == null)
+        || (userId == null)) {
+      LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId
+          + " userId=" + userId + " clusterId=" + clusterId
+          + " . Not proceeding with writing to hbase");
+      return putStatus;
+    }
+
+    for (TimelineEntity te : data.getEntities()) {
+
+      // a set can have at most 1 null
+      if (te == null) {
+        continue;
+      }
+
+      // if the entity is the application, the destination is the application
+      // table
+      boolean isApplication = ApplicationEntity.isApplicationEntity(te);
+      byte[] rowKey;
+      if (isApplication) {
+        ApplicationRowKey applicationRowKey =
+            new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+                appId);
+        rowKey = applicationRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
+      } else {
+        EntityRowKey entityRowKey =
+            new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+                te.getType(), te.getIdPrefix(), te.getId());
+        rowKey = entityRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
+      }
+
+      if (!isApplication && !userId.equals(subApplicationUser)) {
+        SubApplicationRowKey subApplicationRowKey =
+            new SubApplicationRowKey(subApplicationUser, clusterId,
+                te.getType(), te.getIdPrefix(), te.getId(), userId);
+        rowKey = subApplicationRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
+      }
+
+      if (isApplication) {
+        TimelineEvent event =
+            ApplicationEntity.getApplicationEvent(te,
+                ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        FlowRunRowKey flowRunRowKey =
+            new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+        if (event != null) {
+          onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
+              flowVersion, te, event.getTimestamp());
+        }
+        // if it's an application entity, store metrics
+        storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
+        // if application has finished, store it's finish time and write final
+        // values of all metrics
+        event = ApplicationEntity.getApplicationEvent(te,
+            ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        if (event != null) {
+          onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+              event.getTimestamp());
+        }
+      }
+    }
+    return putStatus;
+  }
+
+  private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+      String clusterId, String appId, String userId, String flowVersion,
+      TimelineEntity te, long appCreatedTimeStamp)
+      throws IOException {
+
+    String flowName = flowRunRowKey.getFlowName();
+    Long flowRunId = flowRunRowKey.getFlowRunId();
+
+    // store in App to flow table
+    AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    ColumnRWHelper.store(rowKey, appToFlowTable,
+        AppToFlowColumnPrefix.FLOW_NAME, clusterId, null, flowName);
+    ColumnRWHelper.store(rowKey, appToFlowTable,
+        AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId, null, flowRunId);
+    ColumnRWHelper.store(rowKey, appToFlowTable, AppToFlowColumnPrefix.USER_ID,
+        clusterId, null, userId);
+
+    // store in flow run table
+    storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
+
+    // store in flow activity table
+    byte[] flowActivityRowKeyBytes =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+            .getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    ColumnRWHelper.store(flowActivityRowKeyBytes, flowActivityTable,
+        FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * updates the {@link FlowRunTable} with Application Created information
+   */
+  private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MIN_START_TIME,
+        null, te.getCreatedTime(),
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+
+  /*
+   * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+   * application has finished
+   */
+  private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+      String flowVersion, String appId, TimelineEntity te,
+      long appFinishedTimeStamp) throws IOException {
+    // store in flow run table
+    storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+        appFinishedTimeStamp);
+
+    // indicate in the flow activity table that the app has finished
+    byte[] rowKey =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appFinishedTimeStamp, flowRunRowKey.getUserId(),
+            flowRunRowKey.getFlowName()).getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    ColumnRWHelper.store(rowKey, flowActivityTable,
+        FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * Update the {@link FlowRunTable} with Application Finished information
+   */
+  private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te, long appFinishedTimeStamp)
+      throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    Attribute attributeAppId =
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
+    ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MAX_END_TIME,
+        null, appFinishedTimeStamp, attributeAppId);
+
+    // store the final value of metrics since application has finished
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      storeFlowMetrics(rowKey, metrics, attributeAppId,
+          AggregationOperation.SUM_FINAL.getAttribute());
+    }
+  }
+
+  /*
+   * Updates the {@link FlowRunTable} with Application Metrics
+   */
+  private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      byte[] rowKey = flowRunRowKey.getRowKey();
+      storeFlowMetrics(rowKey, metrics,
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+          AggregationOperation.SUM.getAttribute());
+    }
+  }
+
+  private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      Attribute... attributes) throws IOException {
+    for (TimelineMetric metric : metrics) {
+      byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
+      Map<Long, Number> timeseries = metric.getValues();
+      for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+        Long timestamp = timeseriesEntry.getKey();
+        ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumnPrefix.METRIC,
+            metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+            attributes);
+      }
+    }
+  }
+
+  /**
+   * Stores the Relations from the {@linkplain TimelineEntity} object.
+   */
+  private <T extends BaseTable<T>> void storeRelations(byte[] rowKey,
+      Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
+      TypedBufferedMutator<T> table) throws IOException {
+    if (connectedEntities != null) {
+      for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+          .entrySet()) {
+        // id3?id4?id5
+        String compoundValue =
+            Separator.VALUES.joinEncoded(connectedEntity.getValue());
+        ColumnRWHelper.store(rowKey, table, columnPrefix,
+            stringKeyConverter.encode(connectedEntity.getKey()),
+            null, compoundValue);
+      }
+    }
+  }
+
+  /**
+   * Stores information from the {@linkplain TimelineEntity} object.
+   */
+  private void store(byte[] rowKey, TimelineEntity te,
+      String flowVersion,
+      Tables table) throws IOException {
+    switch (table) {
+    case APPLICATION_TABLE:
+      ColumnRWHelper.store(rowKey, applicationTable,
+          ApplicationColumn.ID, null, te.getId());
+      ColumnRWHelper.store(rowKey, applicationTable,
+          ApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
+      ColumnRWHelper.store(rowKey, applicationTable,
+          ApplicationColumn.FLOW_VERSION, null, flowVersion);
+      storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
+          applicationTable);
+      storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
+          applicationTable);
+      storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
+          applicationTable);
+      storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
+          applicationTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          ApplicationColumnPrefix.RELATES_TO, applicationTable);
+      break;
+    case ENTITY_TABLE:
+      ColumnRWHelper.store(rowKey, entityTable,
+          EntityColumn.ID, null, te.getId());
+      ColumnRWHelper.store(rowKey, entityTable,
+          EntityColumn.TYPE, null, te.getType());
+      ColumnRWHelper.store(rowKey, entityTable,
+          EntityColumn.CREATED_TIME, null, te.getCreatedTime());
+      ColumnRWHelper.store(rowKey, entityTable,
+          EntityColumn.FLOW_VERSION, null, flowVersion);
+      storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
+          entityTable);
+      storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
+          entityTable);
+      storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
+          entityTable);
+      storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
+          entityTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO, entityTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO, entityTable);
+      break;
+    case SUBAPPLICATION_TABLE:
+      ColumnRWHelper.store(rowKey, subApplicationTable, SubApplicationColumn.ID,
+          null, te.getId());
+      ColumnRWHelper.store(rowKey, subApplicationTable,
+          SubApplicationColumn.TYPE, null, te.getType());
+      ColumnRWHelper.store(rowKey, subApplicationTable,
+          SubApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
+      ColumnRWHelper.store(rowKey, subApplicationTable,
+          SubApplicationColumn.FLOW_VERSION, null, flowVersion);
+      storeInfo(rowKey, te.getInfo(), flowVersion,
+          SubApplicationColumnPrefix.INFO, subApplicationTable);
+      storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
+          subApplicationTable);
+      storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
+          subApplicationTable);
+      storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
+          subApplicationTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
+      break;
+    default:
+      LOG.info("Invalid table name provided.");
+      break;
+    }
+  }
+
+  /**
+   * stores the info information from {@linkplain TimelineEntity}.
+   */
+  private <T extends BaseTable<T>> void storeInfo(byte[] rowKey,
+      Map<String, Object> info, String flowVersion,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T > table)
+      throws IOException {
+    if (info != null) {
+      for (Map.Entry<String, Object> entry : info.entrySet()) {
+        ColumnRWHelper.store(rowKey, table, columnPrefix,
+            stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * stores the config information from {@linkplain TimelineEntity}.
+   */
+  private <T extends BaseTable<T>> void storeConfig(
+      byte[] rowKey, Map<String, String> config,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
+    if (config != null) {
+      for (Map.Entry<String, String> entry : config.entrySet()) {
+        byte[] configKey = stringKeyConverter.encode(entry.getKey());
+        ColumnRWHelper.store(rowKey, table, columnPrefix, configKey,
+            null, entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * stores the {@linkplain TimelineMetric} information from the
+   * {@linkplain TimelineEvent} object.
+   */
+  private <T extends BaseTable<T>> void storeMetrics(
+      byte[] rowKey, Set<TimelineMetric> metrics,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
+    if (metrics != null) {
+      for (TimelineMetric metric : metrics) {
+        byte[] metricColumnQualifier =
+            stringKeyConverter.encode(metric.getId());
+        Map<Long, Number> timeseries = metric.getValues();
+        for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+          Long timestamp = timeseriesEntry.getKey();
+          ColumnRWHelper.store(rowKey, table, columnPrefix,
+              metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores the events from the {@linkplain TimelineEvent} object.
+   */
+  private <T extends BaseTable<T>> void storeEvents(
+      byte[] rowKey, Set<TimelineEvent> events,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
+    if (events != null) {
+      for (TimelineEvent event : events) {
+        if (event != null) {
+          String eventId = event.getId();
+          if (eventId != null) {
+            long eventTimestamp = event.getTimestamp();
+            // if the timestamp is not set, use the current timestamp
+            if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
+              LOG.warn("timestamp is not set for event " + eventId +
+                  "! Using the current timestamp");
+              eventTimestamp = System.currentTimeMillis();
+            }
+            Map<String, Object> eventInfo = event.getInfo();
+            if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              byte[] columnQualifierBytes =
+                  new EventColumnName(eventId, eventTimestamp, null)
+                      .getColumnQualifier();
+              ColumnRWHelper.store(rowKey, table, columnPrefix,
+                  columnQualifierBytes, null, Separator.EMPTY_BYTES);
+            } else {
+              for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
+                // eventId=infoKey
+                byte[] columnQualifierBytes =
+                    new EventColumnName(eventId, eventTimestamp, info.getKey())
+                        .getColumnQualifier();
+                ColumnRWHelper.store(rowKey, table, columnPrefix,
+                    columnQualifierBytes, null, info.getValue());
+              } // for info: eventInfo
+            }
+          }
+        }
+      } // event : events
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage
+   * .TimelineWriter#aggregate
+   * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+   * org.apache
+   * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+   */
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+   * ()
+   */
+  @Override
+  public void flush() throws IOException {
+    // flush all buffered mutators
+    entityTable.flush();
+    appToFlowTable.flush();
+    applicationTable.flush();
+    flowRunTable.flush();
+    flowActivityTable.flush();
+    subApplicationTable.flush();
+  }
+
+  /**
+   * close the hbase connections The close APIs perform flushing and release any
+   * resources held.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (entityTable != null) {
+      LOG.info("closing the entity table");
+      // The close API performs flushing and releases any resources held
+      entityTable.close();
+    }
+    if (appToFlowTable != null) {
+      LOG.info("closing the app_flow table");
+      // The close API performs flushing and releases any resources held
+      appToFlowTable.close();
+    }
+    if (applicationTable != null) {
+      LOG.info("closing the application table");
+      applicationTable.close();
+    }
+    if (flowRunTable != null) {
+      LOG.info("closing the flow run table");
+      // The close API performs flushing and releases any resources held
+      flowRunTable.close();
+    }
+    if (flowActivityTable != null) {
+      LOG.info("closing the flowActivityTable table");
+      // The close API performs flushing and releases any resources held
+      flowActivityTable.close();
+    }
+    if (subApplicationTable != null) {
+      subApplicationTable.close();
+    }
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
new file mode 100644
index 0000000..e9e4770
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This creates the schema for a hbase based backend for storing application
+ * timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineSchemaCreator {
+  private TimelineSchemaCreator() {
+  }
+
+  final static String NAME = TimelineSchemaCreator.class.getSimpleName();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TimelineSchemaCreator.class);
+  private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
+  private static final String APP_METRICS_TTL_OPTION_SHORT = "ma";
+  private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa";
+  private static final String APP_TABLE_NAME_SHORT = "a";
+  private static final String SUB_APP_TABLE_NAME_SHORT = "sa";
+  private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
+  private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me";
+  private static final String ENTITY_TABLE_NAME_SHORT = "e";
+  private static final String HELP_SHORT = "h";
+  private static final String CREATE_TABLES_SHORT = "c";
+
+  public static void main(String[] args) throws Exception {
+
+    LOG.info("Starting the schema creation");
+    Configuration hbaseConf =
+        HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(
+            new YarnConfiguration());
+    // Grab input args and allow for -Dxyz style arguments
+    String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
+        .getRemainingArgs();
+
+    // Grab the arguments we're looking for.
+    CommandLine commandLine = parseArgs(otherArgs);
+
+    if (commandLine.hasOption(HELP_SHORT)) {
+      // -help option has the highest precedence
+      printUsage();
+    } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) {
+      // Grab the entityTableName argument
+      String entityTableName = commandLine.getOptionValue(
+          ENTITY_TABLE_NAME_SHORT);
+      if (StringUtils.isNotBlank(entityTableName)) {
+        hbaseConf.set(EntityTableRW.TABLE_NAME_CONF_NAME, entityTableName);
+      }
+      // Grab the entity metrics TTL
+      String entityTableMetricsTTL = commandLine.getOptionValue(
+          ENTITY_METRICS_TTL_OPTION_SHORT);
+      if (StringUtils.isNotBlank(entityTableMetricsTTL)) {
+        int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL);
+        new EntityTableRW().setMetricsTTL(entityMetricsTTL, hbaseConf);
+      }
+      // Grab the appToflowTableName argument
+      String appToflowTableName = commandLine.getOptionValue(
+          APP_TO_FLOW_TABLE_NAME_SHORT);
+      if (StringUtils.isNotBlank(appToflowTableName)) {
+        hbaseConf.set(
+            AppToFlowTableRW.TABLE_NAME_CONF_NAME, appToflowTableName);
+      }
+      // Grab the applicationTableName argument
+      String applicationTableName = commandLine.getOptionValue(
+          APP_TABLE_NAME_SHORT);
+      if (StringUtils.isNotBlank(applicationTableName)) {
+        hbaseConf.set(ApplicationTableRW.TABLE_NAME_CONF_NAME,
+            applicationTableName);
+      }
+      // Grab the application metrics TTL
+      String applicationTableMetricsTTL = commandLine.getOptionValue(
+          APP_METRICS_TTL_OPTION_SHORT);
+      if (StringUtils.isNotBlank(applicationTableMetricsTTL)) {
+        int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL);
+        new ApplicationTableRW().setMetricsTTL(appMetricsTTL, hbaseConf);
+      }
+
+      // Grab the subApplicationTableName argument
+      String subApplicationTableName = commandLine.getOptionValue(
+          SUB_APP_TABLE_NAME_SHORT);
+      if (StringUtils.isNotBlank(subApplicationTableName)) {
+        hbaseConf.set(SubApplicationTableRW.TABLE_NAME_CONF_NAME,
+            subApplicationTableName);
+      }
+      // Grab the subApplication metrics TTL
+      String subApplicationTableMetricsTTL = commandLine
+          .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT);
+      if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) {
+        int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL);
+        new SubApplicationTableRW().setMetricsTTL(subAppMetricsTTL, hbaseConf);
+      }
+
+      // create all table schemas in hbase
+      final boolean skipExisting = commandLine.hasOption(
+          SKIP_EXISTING_TABLE_OPTION_SHORT);
+      createAllSchemas(hbaseConf, skipExisting);
+    } else {
+      // print usage information if -create is not specified
+      printUsage();
+    }
+  }
+
+  /**
+   * Parse command-line arguments.
+   *
+   * @param args
+   *          command line arguments passed to program.
+   * @return parsed command line.
+   * @throws ParseException
+   */
+  private static CommandLine parseArgs(String[] args) throws ParseException {
+    Options options = new Options();
+
+    // Input
+    Option o = new Option(HELP_SHORT, "help", false, "print help information");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(CREATE_TABLES_SHORT, "create", false,
+        "a mandatory option to create hbase tables");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true,
+        "entity table name");
+    o.setArgName("entityTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true,
+        "TTL for metrics column family");
+    o.setArgName("entityMetricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true,
+        "app to flow table name");
+    o.setArgName("appToflowTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true,
+        "application table name");
+    o.setArgName("applicationTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true,
+        "TTL for metrics column family");
+    o.setArgName("applicationMetricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true,
+        "subApplication table name");
+    o.setArgName("subApplicationTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL",
+        true, "TTL for metrics column family");
+    o.setArgName("subApplicationMetricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    // Options without an argument
+    // No need to set arg name since we do not need an argument here
+    o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
+        false, "skip existing Hbase tables and continue to create new tables");
+    o.setRequired(false);
+    options.addOption(o);
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine commandLine = null;
+    try {
+      commandLine = parser.parse(options, args);
+    } catch (Exception e) {
+      LOG.error("ERROR: " + e.getMessage() + "\n");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(NAME + " ", options, true);
+      System.exit(-1);
+    }
+
+    return commandLine;
+  }
+
+  private static void printUsage() {
+    StringBuilder usage = new StringBuilder("Command Usage: \n");
+    usage.append("TimelineSchemaCreator [-help] Display help info" +
+        " for all commands. Or\n");
+    usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" +
+        " Create hbase tables.\n\n");
+    usage.append("The Optional options for creating tables include: \n");
+    usage.append("[-entityTableName <Entity Table Name>] " +
+        "The name of the Entity table\n");
+    usage.append("[-entityMetricsTTL <Entity Table Metrics TTL>]" +
+        " TTL for metrics in the Entity table\n");
+    usage.append("[-appToflowTableName <AppToflow Table Name>]" +
+        " The name of the AppToFlow table\n");
+    usage.append("[-applicationTableName <Application Table Name>]" +
+        " The name of the Application table\n");
+    usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" +
+        " TTL for metrics in the Application table\n");
+    usage.append("[-subApplicationTableName <SubApplication Table Name>]" +
+        " The name of the SubApplication table\n");
+    usage.append("[-subApplicationMetricsTTL " +
+        " <SubApplication Table Metrics TTL>]" +
+        " TTL for metrics in the SubApplication table\n");
+    usage.append("[-skipExistingTable] Whether to skip existing" +
+        " hbase tables\n");
+    System.out.println(usage.toString());
+  }
+
+  /**
+   * Create all table schemas and log success or exception if failed.
+   * @param hbaseConf the hbase configuration to create tables with
+   * @param skipExisting whether to skip existing hbase tables
+   */
+  private static void createAllSchemas(Configuration hbaseConf,
+      boolean skipExisting) {
+    List<Exception> exceptions = new ArrayList<>();
+    try {
+      if (skipExisting) {
+        LOG.info("Will skip existing tables and continue on htable creation "
+            + "exceptions!");
+      }
+      createAllTables(hbaseConf, skipExisting);
+      LOG.info("Successfully created HBase schema. ");
+    } catch (IOException e) {
+      LOG.error("Error in creating hbase tables: ", e);
+      exceptions.add(e);
+    }
+
+    if (exceptions.size() > 0) {
+      LOG.warn("Schema creation finished with the following exceptions");
+      for (Exception e : exceptions) {
+        LOG.warn(e.getMessage());
+      }
+      System.exit(-1);
+    } else {
+      LOG.info("Schema creation finished successfully");
+    }
+  }
+
+  @VisibleForTesting
+  public static void createAllTables(Configuration hbaseConf,
+      boolean skipExisting) throws IOException {
+
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(hbaseConf);
+      Admin admin = conn.getAdmin();
+      if (admin == null) {
+        throw new IOException("Cannot create table since admin is null");
+      }
+      try {
+        new EntityTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new AppToFlowTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new ApplicationTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowRunTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowActivityTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new SubApplicationTableRW().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
new file mode 100644
index 0000000..808994e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the Application Table.
+ */
+public class ApplicationTableRW extends BaseTableRW<ApplicationTable> {
+  /** application prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application";
+
+  /** config param name that specifies the application table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * application table.
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+
+  /**
+   * config param name that specifies max-versions for metrics column family in
+   * entity table.
+   */
+  private static final String METRICS_MAX_VERSIONS =
+      PREFIX + ".table.metrics.max-versions";
+
+  /** default value for application table name. */
+  private static final String DEFAULT_TABLE_NAME =
+      "timelineservice.application";
+
+  /** default TTL is 30 days for metrics timeseries. */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+
+  /** default max number of versions. */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ApplicationTableRW.class);
+
+  public ApplicationTableRW() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+   * createTable(org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor applicationTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    applicationTableDescp.addFamily(infoCF);
+
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    applicationTableDescp.addFamily(configCF);
+
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes());
+    applicationTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(
+        hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+        DEFAULT_METRICS_TTL));
+    applicationTableDescp.setRegionSplitPolicyClassName(
+        "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(applicationTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+
+  /**
+   * @param metricsTTL time to live parameter for the metrics in this table.
+   * @param hbaseConf configuration in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
new file mode 100644
index 0000000..03f508f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.application
+ * contains classes related to implementation for application table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
new file mode 100644
index 0000000..6460203
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Create, read and write to the AppToFlow Table.
+ */
+public class AppToFlowTableRW extends BaseTableRW<AppToFlowTable> {
+  /** app_flow prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+  /** config param name that specifies the app_flow table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for app_flow table name. */
+  private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AppToFlowTableRW.class);
+
+  public AppToFlowTableRW() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+   * createTable(org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor mappCF =
+        new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+    mappCF.setBloomFilterType(BloomType.ROWCOL);
+    appToFlowTableDescp.addFamily(mappCF);
+
+    appToFlowTableDescp
+        .setRegionSplitPolicyClassName(
+            "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(appToFlowTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
new file mode 100644
index 0000000..f01d982
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow
+ * contains classes related to implementation for app to flow table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
new file mode 100644
index 0000000..12ebce4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Implements behavior common to tables used in the timeline service storage. It
+ * is thread-safe, and can be used by multiple threads concurrently.
+ *
+ * @param <T> reference to the table instance class itself for type safety.
+ */
+public abstract class BaseTableRW<T extends BaseTable<T>> {
+
+  /**
+   * Name of config variable that is used to point to this table.
+   */
+  private final String tableNameConfName;
+
+  /**
+   * Unless the configuration overrides, this will be the default name for the
+   * table when it is created.
+   */
+  private final String defaultTableName;
+
+  /**
+   * @param tableNameConfName name of config variable that is used to point to
+   *          this table.
+   * @param defaultTableName Default table name if table from config is not
+   *          found.
+   */
+  protected BaseTableRW(String tableNameConfName, String defaultTableName) {
+    this.tableNameConfName = tableNameConfName;
+    this.defaultTableName = defaultTableName;
+  }
+
+  /**
+   * Used to create a type-safe mutator for this table.
+   *
+   * @param hbaseConf used to read table name.
+   * @param conn used to create a table from.
+   * @return a type safe {@link BufferedMutator} for the entity table.
+   * @throws IOException if any exception occurs while creating mutator for the
+   *     table.
+   */
+  public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
+      Connection conn) throws IOException {
+
+    TableName tableName = this.getTableName(hbaseConf);
+
+    // Plain buffered mutator
+    BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName);
+
+    // Now make this thing type safe.
+    // This is how service initialization should hang on to this variable, with
+    // the proper type
+    TypedBufferedMutator<T> table =
+        new TypedBufferedMutator<T>(bufferedMutator);
+
+    return table;
+  }
+
+  /**
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param scan that specifies what you want to read from this table.
+   * @return scanner for the table.
+   * @throws IOException if any exception occurs while getting the scanner.
+   */
+  public ResultScanner getResultScanner(Configuration hbaseConf,
+      Connection conn, Scan scan) throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.getScanner(scan);
+  }
+
+  /**
+   *
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param get that specifies what single row you want to get from this table
+   * @return result of get operation
+   * @throws IOException if any exception occurs while getting the result.
+   */
+  public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+      throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.get(get);
+  }
+
+  /**
+   * Get the table name for the input table.
+   *
+   * @param conf HBase configuration from which table name will be fetched.
+   * @param tableName name of the table to be fetched
+   * @return A {@link TableName} object.
+   */
+  public static TableName getTableName(Configuration conf, String tableName) {
+    String tableSchemaPrefix =  conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX);
+    return TableName.valueOf(tableSchemaPrefix + tableName);
+  }
+
+  /**
+   * Get the table name for this table.
+   *
+   * @param conf HBase configuration from which table name will be fetched.
+   * @return A {@link TableName} object.
+   */
+  public TableName getTableName(Configuration conf) {
+    String tableName = conf.get(tableNameConfName, defaultTableName);
+    return getTableName(conf, tableName);
+  }
+
+  /**
+   * Get the table name based on the input config parameters.
+   *
+   * @param conf HBase configuration from which table name will be fetched.
+   * @param tableNameInConf the table name parameter in conf.
+   * @param defaultTableName the default table name.
+   * @return A {@link TableName} object.
+   */
+  public static TableName getTableName(Configuration conf,
+      String tableNameInConf, String defaultTableName) {
+    String tableName = conf.get(tableNameInConf, defaultTableName);
+    return getTableName(conf, tableName);
+  }
+
+  /**
+   * Used to create the table in HBase. Should be called only once (per HBase
+   * instance).
+   *
+   * @param admin Used for doing HBase table operations.
+   * @param hbaseConf Hbase configuration.
+   * @throws IOException if any exception occurs while creating the table.
+   */
+  public abstract void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException;
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org