You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2018/02/18 08:09:48 UTC
[16/18] hadoop git commit: YARN-7919. Refactor timelineservice-hbase
module into submodules. Contributed by Haibo Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
new file mode 100644
index 0000000..a8e5149
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A set of utility functions that read or read to a column.
+ * This class is meant to be used only by explicit Columns,
+ * and not directly to write by clients.
+ */
+public final class ColumnRWHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ColumnHelper.class);
+
+ private ColumnRWHelper() {
+ }
+
+ /**
+ * Figures out the cell timestamp used in the Put For storing.
+ * Will supplement the timestamp if required. Typically done for flow run
+ * table.If we supplement the timestamp, we left shift the timestamp and
+ * supplement it with the AppId id so that there are no collisions in the flow
+ * run table's cells.
+ */
+ private static long getPutTimestamp(
+ Long timestamp, boolean supplementTs, Attribute[] attributes) {
+ if (timestamp == null) {
+ timestamp = System.currentTimeMillis();
+ }
+ if (!supplementTs) {
+ return timestamp;
+ } else {
+ String appId = getAppIdFromAttributes(attributes);
+ long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+ timestamp, appId);
+ return supplementedTS;
+ }
+ }
+
+ private static String getAppIdFromAttributes(Attribute[] attributes) {
+ if (attributes == null) {
+ return null;
+ }
+ String appId = null;
+ for (Attribute attribute : attributes) {
+ if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+ attribute.getName())) {
+ appId = Bytes.toString(attribute.getValue());
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey
+ * identifying the row to write. Nothing gets written when null.
+ * @param tableMutator
+ * used to modify the underlying HBase table
+ * @param column the column that is to be modified
+ * @param timestamp
+ * version timestamp. When null the current timestamp multiplied with
+ * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+ * app id will be used
+ * @param inputValue
+ * the value to write to the rowKey and column qualifier. Nothing
+ * gets written when null.
+ * @param attributes Attributes to be set for HBase Put.
+ * @throws IOException if any problem occurs during store operation(sending
+ * mutation to table).
+ */
+ public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+ Column<?> column, Long timestamp,
+ Object inputValue, Attribute... attributes)
+ throws IOException {
+ store(rowKey, tableMutator, column.getColumnFamilyBytes(),
+ column.getColumnQualifierBytes(), timestamp,
+ column.supplementCellTimestamp(), inputValue,
+ column.getValueConverter(),
+ column.getCombinedAttrsWithAggr(attributes));
+ }
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey
+ * identifying the row to write. Nothing gets written when null.
+ * @param tableMutator
+ * used to modify the underlying HBase table
+ * @param columnFamilyBytes
+ * @param columnQualifier
+ * column qualifier. Nothing gets written when null.
+ * @param timestamp
+ * version timestamp. When null the current timestamp multiplied with
+ * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+ * app id will be used
+ * @param inputValue
+ * the value to write to the rowKey and column qualifier. Nothing
+ * gets written when null.
+ * @param converter
+ * @param attributes Attributes to be set for HBase Put.
+ * @throws IOException if any problem occurs during store operation(sending
+ * mutation to table).
+ */
+ public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+ byte[] columnFamilyBytes, byte[] columnQualifier, Long timestamp,
+ boolean supplementTs, Object inputValue, ValueConverter converter,
+ Attribute... attributes) throws IOException {
+ if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
+ return;
+ }
+ Put p = new Put(rowKey);
+ timestamp = getPutTimestamp(timestamp, supplementTs, attributes);
+ p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+ converter.encodeValue(inputValue));
+ if ((attributes != null) && (attributes.length > 0)) {
+ for (Attribute attribute : attributes) {
+ p.setAttribute(attribute.getName(), attribute.getValue());
+ }
+ }
+ tableMutator.mutate(p);
+ }
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+ *
+ * @param result from which to read the value. Cannot be null
+ * @param columnFamilyBytes
+ * @param columnQualifierBytes referring to the column to be read.
+ * @param converter
+ * @return latest version of the specified column of whichever object was
+ * written.
+ * @throws IOException if any problem occurs while reading result.
+ */
+ public static Object readResult(Result result, byte[] columnFamilyBytes,
+ byte[] columnQualifierBytes, ValueConverter converter)
+ throws IOException {
+ if (result == null || columnQualifierBytes == null) {
+ return null;
+ }
+
+ // Would have preferred to be able to use getValueAsByteBuffer and get a
+ // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+ // that.
+ byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+ return converter.decodeValue(value);
+ }
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+ *
+ * @param result from which to read the value. Cannot be null
+ * @param column the column that the result can be parsed to
+ * @return latest version of the specified column of whichever object was
+ * written.
+ * @throws IOException if any problem occurs while reading result.
+ */
+ public static Object readResult(Result result, Column<?> column)
+ throws IOException {
+ return readResult(result, column.getColumnFamilyBytes(),
+ column.getColumnQualifierBytes(), column.getValueConverter());
+ }
+
+ /**
+ * Get the latest version of this specified column. Note: this call clones the
+ * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
+ *
+ * @param result Cannot be null
+ * @param columnPrefix column prefix to read from
+ * @param qualifier column qualifier. Nothing gets read when null.
+ * @return result object (can be cast to whatever object was written to) or
+ * null when specified column qualifier for this prefix doesn't exist
+ * in the result.
+ * @throws IOException if there is any exception encountered while reading
+ * result.
+ */
+ public static Object readResult(Result result, ColumnPrefix<?> columnPrefix,
+ String qualifier) throws IOException {
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ columnPrefix.getColumnPrefixInBytes(), qualifier);
+
+ return readResult(
+ result, columnPrefix.getColumnFamilyBytes(),
+ columnQualifier, columnPrefix.getValueConverter());
+ }
+
+ /**
+ *
+ * @param <K> identifies the type of key converter.
+ * @param result from which to read columns.
+ * @param keyConverter used to convert column bytes to the appropriate key
+ * type
+ * @return the latest values of columns in the column family with this prefix
+ * (or all of them if the prefix value is null).
+ * @throws IOException if there is any exception encountered while reading
+ * results.
+ */
+ public static <K> Map<K, Object> readResults(Result result,
+ ColumnPrefix<?> columnPrefix, KeyConverter<K> keyConverter)
+ throws IOException {
+ return readResults(result,
+ columnPrefix.getColumnFamilyBytes(),
+ columnPrefix.getColumnPrefixInBytes(),
+ keyConverter, columnPrefix.getValueConverter());
+ }
+
+ /**
+ * @param result from which to reads data with timestamps.
+ * @param <K> identifies the type of key converter.
+ * @param <V> the type of the values. The values will be cast into that type.
+ * @param keyConverter used to convert column bytes to the appropriate key
+ * type.
+ * @return the cell values at each respective time in for form
+ * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+ * idB={timestamp3->value3}, idC={timestamp1->value4}}}
+ * @throws IOException if there is any exception encountered while reading
+ * result.
+ */
+ public static <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result, ColumnPrefix<?> columnPrefix,
+ KeyConverter<K> keyConverter) throws IOException {
+ return readResultsWithTimestamps(result,
+ columnPrefix.getColumnFamilyBytes(),
+ columnPrefix.getColumnPrefixInBytes(),
+ keyConverter, columnPrefix.getValueConverter(),
+ columnPrefix.supplementCellTimeStamp());
+ }
+
+ /**
+ * @param result from which to reads data with timestamps
+ * @param columnPrefixBytes optional prefix to limit columns. If null all
+ * columns are returned.
+ * @param <K> identifies the type of column name(indicated by type of key
+ * converter).
+ * @param <V> the type of the values. The values will be cast into that type.
+ * @param keyConverter used to convert column bytes to the appropriate key
+ * type.
+ * @return the cell values at each respective time in for form
+ * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
+ * idB={timestamp3->value3}, idC={timestamp1->value4}}}
+ * @throws IOException if any problem occurs while reading results.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> NavigableMap<K, NavigableMap<Long, V>>
+ readResultsWithTimestamps(Result result, byte[] columnFamilyBytes,
+ byte[] columnPrefixBytes, KeyConverter<K> keyConverter,
+ ValueConverter valueConverter, boolean supplementTs)
+ throws IOException {
+
+ NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>();
+
+ if (result != null) {
+ NavigableMap<
+ byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
+ result.getMap();
+
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+ resultMap.get(columnFamilyBytes);
+ // could be that there is no such column family.
+ if (columnCellMap != null) {
+ for (Map.Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+ .entrySet()) {
+ K converterColumnKey = null;
+ if (columnPrefixBytes == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("null prefix was specified; returning all columns");
+ }
+ try {
+ converterColumnKey = keyConverter.decode(entry.getKey());
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Illegal column found, skipping this column.", iae);
+ continue;
+ }
+ } else {
+ // A non-null prefix means columns are actually of the form
+ // prefix!columnNameRemainder
+ byte[][] columnNameParts =
+ Separator.QUALIFIERS.split(entry.getKey(), 2);
+ byte[] actualColumnPrefixBytes = columnNameParts[0];
+ if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+ && columnNameParts.length == 2) {
+ try {
+ // This is the prefix that we want
+ converterColumnKey = keyConverter.decode(columnNameParts[1]);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Illegal column found, skipping this column.", iae);
+ continue;
+ }
+ }
+ }
+
+ // If this column has the prefix we want
+ if (converterColumnKey != null) {
+ NavigableMap<Long, V> cellResults =
+ new TreeMap<Long, V>();
+ NavigableMap<Long, byte[]> cells = entry.getValue();
+ if (cells != null) {
+ for (Map.Entry<Long, byte[]> cell : cells.entrySet()) {
+ V value =
+ (V) valueConverter.decodeValue(cell.getValue());
+ Long ts = supplementTs ? TimestampGenerator.
+ getTruncatedTimestamp(cell.getKey()) : cell.getKey();
+ cellResults.put(ts, value);
+ }
+ }
+ results.put(converterColumnKey, cellResults);
+ }
+ } // for entry : columnCellMap
+ } // if columnCellMap != null
+ } // if result != null
+ return results;
+ }
+
+ /**
+ * @param <K> identifies the type of column name(indicated by type of key
+ * converter).
+ * @param result from which to read columns
+ * @param columnPrefixBytes optional prefix to limit columns. If null all
+ * columns are returned.
+ * @param keyConverter used to convert column bytes to the appropriate key
+ * type.
+ * @return the latest values of columns in the column family. If the column
+ * prefix is null, the column qualifier is returned as Strings. For a
+ * non-null column prefix bytes, the column qualifier is returned as
+ * a list of parts, each part a byte[]. This is to facilitate
+ * returning byte arrays of values that were not Strings.
+ * @throws IOException if any problem occurs while reading results.
+ */
+ public static <K> Map<K, Object> readResults(Result result,
+ byte[] columnFamilyBytes, byte[] columnPrefixBytes,
+ KeyConverter<K> keyConverter, ValueConverter valueConverter)
+ throws IOException {
+ Map<K, Object> results = new HashMap<K, Object>();
+
+ if (result != null) {
+ Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+ for (Map.Entry<byte[], byte[]> entry : columns.entrySet()) {
+ byte[] columnKey = entry.getKey();
+ if (columnKey != null && columnKey.length > 0) {
+
+ K converterColumnKey = null;
+ if (columnPrefixBytes == null) {
+ try {
+ converterColumnKey = keyConverter.decode(columnKey);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Illegal column found, skipping this column.", iae);
+ continue;
+ }
+ } else {
+ // A non-null prefix means columns are actually of the form
+ // prefix!columnNameRemainder
+ byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
+ if (columnNameParts.length > 0) {
+ byte[] actualColumnPrefixBytes = columnNameParts[0];
+ // If this is the prefix that we want
+ if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+ && columnNameParts.length == 2) {
+ try {
+ converterColumnKey = keyConverter.decode(columnNameParts[1]);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Illegal column found, skipping this column.", iae);
+ continue;
+ }
+ }
+ }
+ } // if-else
+
+ // If the columnPrefix is null (we want all columns), or the actual
+ // prefix matches the given prefix we want this column
+ if (converterColumnKey != null) {
+ Object value = valueConverter.decodeValue(entry.getValue());
+ // we return the columnQualifier in parts since we don't know
+ // which part is of which data type.
+ results.put(converterColumnKey, value);
+ }
+ }
+ } // for entry
+ }
+ return results;
+ }
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param qualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException if there is any exception encountered while doing
+ * store operation(sending mutation to the table).
+ */
+ public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+ ColumnPrefix<?> columnPrefix, byte[] qualifier, Long timestamp,
+ Object inputValue, Attribute... attributes) throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ +tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier);
+ Attribute[] combinedAttributes =
+ columnPrefix.getCombinedAttrsWithAggr(attributes);
+
+ store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(),
+ columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(),
+ inputValue, columnPrefix.getValueConverter(), combinedAttributes);
+ }
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param qualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException if there is any exception encountered while doing
+ * store operation(sending mutation to the table).
+ */
+ public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+ ColumnPrefix<?> columnPrefix, String qualifier, Long timestamp,
+ Object inputValue, Attribute... attributes) throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier);
+ Attribute[] combinedAttributes =
+ columnPrefix.getCombinedAttrsWithAggr(attributes);
+
+ store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(),
+ columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(),
+ inputValue, columnPrefix.getValueConverter(), combinedAttributes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..f4cd6fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Query;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A bunch of utility functions used in HBase TimelineService backend.
+ */
+public final class HBaseTimelineStorageUtils {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HBaseTimelineStorageUtils.class);
+
+ private HBaseTimelineStorageUtils() {
+ }
+
+
+ /**
+ * @param conf YARN configuration. Used to see if there is an explicit config
+ * pointing to the HBase config file to read. It should not be null
+ * or a NullPointerException will be thrown.
+ * @return a configuration with the HBase configuration from the classpath,
+ * optionally overwritten by the timeline service configuration URL if
+ * specified.
+ * @throws MalformedURLException if a timeline service HBase configuration URL
+ * is specified but is a malformed URL.
+ */
+ public static Configuration getTimelineServiceHBaseConf(Configuration conf)
+ throws MalformedURLException {
+ if (conf == null) {
+ throw new NullPointerException();
+ }
+
+ Configuration hbaseConf;
+ String timelineServiceHBaseConfFileURL =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
+ if (timelineServiceHBaseConfFileURL != null
+ && timelineServiceHBaseConfFileURL.length() > 0) {
+ LOG.info("Using hbase configuration at " +
+ timelineServiceHBaseConfFileURL);
+ // create a clone so that we don't mess with out input one
+ hbaseConf = new Configuration(conf);
+ Configuration plainHBaseConf = new Configuration(false);
+ URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
+ plainHBaseConf.addResource(hbaseSiteXML);
+ HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
+ } else {
+ // default to what is on the classpath
+ hbaseConf = HBaseConfiguration.create(conf);
+ }
+ return hbaseConf;
+ }
+
+ /**
+ * Given a row key prefix stored in a byte array, return a byte array for its
+ * immediate next row key.
+ *
+ * @param rowKeyPrefix The provided row key prefix, represented in an array.
+ * @return the closest next row key of the provided row key.
+ */
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(
+ byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and
+ // doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ public static void setMetricsTimeRange(Query query, byte[] metricsCf,
+ long tsBegin, long tsEnd) {
+ if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
+ query.setColumnFamilyTimeRange(metricsCf,
+ tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..8e6c259
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineHBaseSchemaConstants {
+ private TimelineHBaseSchemaConstants() {
+ }
+
+ /**
+ * Used to create a pre-split for tables starting with a username in the
+ * prefix. TODO: this may have to become a config variable (string with
+ * separators) so that different installations can presplit based on their own
+ * commonly occurring names.
+ */
+ private final static byte[][] USERNAME_SPLITS = {
+ Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"),
+ Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"),
+ Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+ Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"),
+ Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
+ Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"),
+ Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"),
+ Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"),
+ Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
+ Bytes.toBytes("y"), Bytes.toBytes("z")
+ };
+
+ /**
+ * The length at which keys auto-split.
+ */
+ public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+ /**
+ * @return splits for splits where a user is a prefix.
+ */
+ public static byte[][] getUsernameSplits() {
+ byte[][] kloon = USERNAME_SPLITS.clone();
+ // Deep copy.
+ for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+ kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+ }
+ return kloon;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
new file mode 100644
index 0000000..29a07e4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * To be used to wrap an actual {@link BufferedMutator} in a type safe manner.
+ *
+ * @param <T> The class referring to the table to be written to.
+ */
+public class TypedBufferedMutator<T extends BaseTable<T>> {
+
+ private final BufferedMutator bufferedMutator;
+
+ /**
+ * @param bufferedMutator the mutator to be wrapped for delegation. Shall not
+ * be null.
+ */
+ public TypedBufferedMutator(BufferedMutator bufferedMutator) {
+ this.bufferedMutator = bufferedMutator;
+ }
+
+ public TableName getName() {
+ return bufferedMutator.getName();
+ }
+
+ public Configuration getConfiguration() {
+ return bufferedMutator.getConfiguration();
+ }
+
+ public void mutate(Mutation mutation) throws IOException {
+ bufferedMutator.mutate(mutation);
+ }
+
+ public void mutate(List<? extends Mutation> mutations) throws IOException {
+ bufferedMutator.mutate(mutations);
+ }
+
+ public void close() throws IOException {
+ bufferedMutator.close();
+ }
+
+ public void flush() throws IOException {
+ bufferedMutator.flush();
+ }
+
+ public long getWriteBufferSize() {
+ return bufferedMutator.getWriteBufferSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
new file mode 100644
index 0000000..0df5b8a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
+ * a set of utility classes used across backend storage reader and writer.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
new file mode 100644
index 0000000..111ae71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the Entity Table.
+ */
+public class EntityTableRW extends BaseTableRW<EntityTable> {
+ /** entity prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
+
+ /** config param name that specifies the entity table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /**
+ * config param name that specifies the TTL for metrics column family in
+ * entity table.
+ */
+ private static final String METRICS_TTL_CONF_NAME = PREFIX
+ + ".table.metrics.ttl";
+
+ /**
+ * config param name that specifies max-versions for metrics column family in
+ * entity table.
+ */
+ private static final String METRICS_MAX_VERSIONS =
+ PREFIX + ".table.metrics.max-versions";
+
+ /** default value for entity table name. */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+
+ /** default TTL is 30 days for metrics timeseries. */
+ private static final int DEFAULT_METRICS_TTL = 2592000;
+
+ /** default max number of versions. */
+ private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(EntityTableRW.class);
+
+ public EntityTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ entityTableDescp.addFamily(infoCF);
+
+ HColumnDescriptor configCF =
+ new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+ configCF.setBloomFilterType(BloomType.ROWCOL);
+ configCF.setBlockCacheEnabled(true);
+ entityTableDescp.addFamily(configCF);
+
+ HColumnDescriptor metricsCF =
+ new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+ entityTableDescp.addFamily(metricsCF);
+ metricsCF.setBlockCacheEnabled(true);
+ // always keep 1 version (the latest)
+ metricsCF.setMinVersions(1);
+ metricsCF.setMaxVersions(
+ hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
+ metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+ DEFAULT_METRICS_TTL));
+ entityTableDescp.setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(entityTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+
+ /**
+ * @param metricsTTL time to live parameter for the metricss in this table.
+ * @param hbaseConf configururation in which to set the metrics TTL config
+ * variable.
+ */
+ public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+ hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
new file mode 100644
index 0000000..bb0e331
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.entity
+ * contains classes related to implementation for entity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
new file mode 100644
index 0000000..5b9fe13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create, read and write to the FlowActivity Table.
+ */
+public class FlowActivityTableRW extends BaseTableRW<FlowActivityTable> {
+ /** flow activity table prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+ /** config param name that specifies the flowactivity table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowactivity table name. */
+ public static final String DEFAULT_TABLE_NAME =
+ "timelineservice.flowactivity";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlowActivityTableRW.class);
+
+ /** default max number of versions. */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowActivityTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor flowActivityTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ flowActivityTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy before running in production
+ admin.createTable(flowActivityTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
new file mode 100644
index 0000000..61c0734
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+
+/**
+ * Create, read and write to the FlowRun table.
+ */
+public class FlowRunTableRW extends BaseTableRW<FlowRunTable> {
+ /** entity prefix. */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+ /** config param name that specifies the flowrun table name. */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowrun table name. */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlowRunTableRW.class);
+
+ /** default max number of versions. */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowRunTableRW() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW#
+ * createTable(org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ flowRunTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy
+ String coprocessorJarPathStr = hbaseConf.get(
+ YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION,
+ YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR);
+
+ Path coprocessorJarPath = new Path(coprocessorJarPathStr);
+ LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString());
+ flowRunTableDescp.addCoprocessor(
+ "org.apache.hadoop.yarn.server.timelineservice.storage." +
+ "flow.FlowRunCoprocessor", coprocessorJarPath,
+ Coprocessor.PRIORITY_USER, null);
+ admin.createTable(flowRunTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
new file mode 100644
index 0000000..04963f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
+ * contains classes related to implementation for flow related tables, viz. flow
+ * run table and flow activity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
new file mode 100644
index 0000000..e78db2a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage contains
+ * classes which define and implement reading and writing to backend storage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
new file mode 100644
index 0000000..0956f1e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+/**
+ * The base class for reading timeline data from the HBase storage. This class
+ * provides basic support to validate and augment reader context.
+ */
+public abstract class AbstractTimelineStorageReader {
+
+ private final TimelineReaderContext context;
+ /**
+ * Used to look up the flow context.
+ */
+ private final AppToFlowTableRW appToFlowTable = new AppToFlowTableRW();
+
+ public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
+ context = ctxt;
+ }
+
+ protected TimelineReaderContext getContext() {
+ return context;
+ }
+
+ /**
+ * Looks up flow context from AppToFlow table.
+ *
+ * @param appToFlowRowKey to identify Cluster and App Ids.
+ * @param clusterId the cluster id.
+ * @param hbaseConf HBase configuration.
+ * @param conn HBase Connection.
+ * @return flow context information.
+ * @throws IOException if any problem occurs while fetching flow information.
+ */
+ protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
+ String clusterId, Configuration hbaseConf, Connection conn)
+ throws IOException {
+ byte[] rowKey = appToFlowRowKey.getRowKey();
+ Get get = new Get(rowKey);
+ Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+ if (result != null && !result.isEmpty()) {
+ Object flowName = ColumnRWHelper.readResult(
+ result, AppToFlowColumnPrefix.FLOW_NAME, clusterId);
+ Object flowRunId = ColumnRWHelper.readResult(
+ result, AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId);
+ Object userId = ColumnRWHelper.readResult(
+ result, AppToFlowColumnPrefix.USER_ID, clusterId);
+ if (flowName == null || userId == null || flowRunId == null) {
+ throw new NotFoundException(
+ "Unable to find the context flow name, and flow run id, "
+ + "and user id for clusterId=" + clusterId
+ + ", appId=" + appToFlowRowKey.getAppId());
+ }
+ return new FlowContext((String)userId, (String)flowName,
+ ((Number)flowRunId).longValue());
+ } else {
+ throw new NotFoundException(
+ "Unable to find the context flow name, and flow run id, "
+ + "and user id for clusterId=" + clusterId
+ + ", appId=" + appToFlowRowKey.getAppId());
+ }
+ }
+
+ /**
+ * Sets certain parameters to defaults if the values are not provided.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @throws IOException if any exception is encountered while setting params.
+ */
+ protected void augmentParams(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ defaultAugmentParams(hbaseConf, conn);
+ }
+
+ /**
+ * Default behavior for all timeline readers to augment parameters.
+ *
+ * @param hbaseConf HBase Configuration.
+ * @param conn HBase Connection.
+ * @throws IOException if any exception is encountered while setting params.
+ */
+ final protected void defaultAugmentParams(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ // In reality all three should be null or neither should be null
+ if (context.getFlowName() == null || context.getFlowRunId() == null
+ || context.getUserId() == null) {
+ // Get flow context information from AppToFlow table.
+ AppToFlowRowKey appToFlowRowKey =
+ new AppToFlowRowKey(context.getAppId());
+ FlowContext flowContext =
+ lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
+ conn);
+ context.setFlowName(flowContext.flowName);
+ context.setFlowRunId(flowContext.flowRunId);
+ context.setUserId(flowContext.userId);
+ }
+ }
+
+ /**
+ * Validates the required parameters to read the entities.
+ */
+ protected abstract void validateParams();
+
+ /**
+ * Encapsulates flow context information.
+ */
+ protected static class FlowContext {
+ private final String userId;
+ private final String flowName;
+ private final Long flowRunId;
+
+ public FlowContext(String user, String flowName, Long flowRunId) {
+ this.userId = user;
+ this.flowName = flowName;
+ this.flowRunId = flowRunId;
+ }
+
+ protected String getUserId() {
+ return userId;
+ }
+
+ protected String getFlowName() {
+ return flowName;
+ }
+
+ protected Long getFlowRunId() {
+ return flowRunId;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org