You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/02/26 22:31:13 UTC
[04/59] [abbrv] hadoop git commit: YARN-7919. Refactor
timelineservice-hbase module into submodules. Contributed by Haibo Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
deleted file mode 100644
index f521cd7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-
-/**
- * Identifies partially qualified columns for the {@link FlowRunTable}.
- */
-public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
-
- /**
- * To store flow run info values.
- */
- METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter());
-
- private final ColumnHelper<FlowRunTable> column;
- private final ColumnFamily<FlowRunTable> columnFamily;
-
- /**
- * Can be null for those cases where the provided column qualifier is the
- * entire column name.
- */
- private final String columnPrefix;
- private final byte[] columnPrefixBytes;
-
- private final AggregationOperation aggOp;
-
- /**
- * Private constructor, meant to be used by the enum definition.
- *
- * @param columnFamily that this column is stored in.
- * @param columnPrefix for this column.
- */
- private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
- String columnPrefix, AggregationOperation fra, ValueConverter converter) {
- this(columnFamily, columnPrefix, fra, converter, false);
- }
-
- private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
- String columnPrefix, AggregationOperation fra, ValueConverter converter,
- boolean compoundColQual) {
- column = new ColumnHelper<FlowRunTable>(columnFamily, converter, true);
- this.columnFamily = columnFamily;
- this.columnPrefix = columnPrefix;
- if (columnPrefix == null) {
- this.columnPrefixBytes = null;
- } else {
- // Future-proof by ensuring the right column prefix hygiene.
- this.columnPrefixBytes =
- Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
- }
- this.aggOp = fra;
- }
-
- /**
- * @return the column name value
- */
- public String getColumnPrefix() {
- return columnPrefix;
- }
-
- public byte[] getColumnPrefixBytes() {
- return columnPrefixBytes.clone();
- }
-
- @Override
- public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
- qualifierPrefix);
- }
-
- @Override
- public byte[] getColumnPrefixBytes(String qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
- qualifierPrefix);
- }
-
- @Override
- public byte[] getColumnFamilyBytes() {
- return columnFamily.getBytes();
- }
-
- public AggregationOperation getAttribute() {
- return aggOp;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #store(byte[],
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
- */
- public void store(byte[] rowKey,
- TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue, Attribute... attributes)
- throws IOException {
-
- // Null check
- if (qualifier == null) {
- throw new IOException("Cannot store column with null qualifier in "
- + tableMutator.getName().getNameAsString());
- }
-
- byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes =
- HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
- combinedAttributes);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #store(byte[],
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
- */
- public void store(byte[] rowKey,
- TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue, Attribute... attributes)
- throws IOException {
-
- // Null check
- if (qualifier == null) {
- throw new IOException("Cannot store column with null qualifier in "
- + tableMutator.getName().getNameAsString());
- }
-
- byte[] columnQualifier = getColumnPrefixBytes(qualifier);
- Attribute[] combinedAttributes =
- HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
- combinedAttributes);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
- */
- public Object readResult(Result result, String qualifier) throws IOException {
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- return column.readResult(result, columnQualifier);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResults(org.apache.hadoop.hbase.client.Result,
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
- */
- public <K> Map<K, Object> readResults(Result result,
- KeyConverter<K> keyConverter) throws IOException {
- return column.readResults(result, columnPrefixBytes, keyConverter);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
- */
- public <K, V> NavigableMap<K, NavigableMap<Long, V>>
- readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
- throws IOException {
- return column.readResultsWithTimestamps(result, columnPrefixBytes,
- keyConverter);
- }
-
- @Override
- public ValueConverter getValueConverter() {
- return column.getValueConverter();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
deleted file mode 100644
index 96a7cf3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Coprocessor for flow run table.
- */
-public class FlowRunCoprocessor extends BaseRegionObserver {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(FlowRunCoprocessor.class);
-
- private Region region;
- /**
- * generate a timestamp that is unique per row in a region this is per region.
- */
- private final TimestampGenerator timestampGenerator =
- new TimestampGenerator();
-
- @Override
- public void start(CoprocessorEnvironment e) throws IOException {
- if (e instanceof RegionCoprocessorEnvironment) {
- RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- this.region = env.getRegion();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * This method adds the tags onto the cells in the Put. It is presumed that
- * all the cells in one Put have the same set of Tags. The existing cell
- * timestamp is overwritten for non-metric cells and each such cell gets a new
- * unique timestamp generated by {@link TimestampGenerator}
- *
- * @see
- * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
- * .hadoop.hbase.coprocessor.ObserverContext,
- * org.apache.hadoop.hbase.client.Put,
- * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
- * org.apache.hadoop.hbase.client.Durability)
- */
- @Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
- WALEdit edit, Durability durability) throws IOException {
- Map<String, byte[]> attributes = put.getAttributesMap();
- // Assumption is that all the cells in a put are the same operation.
- List<Tag> tags = new ArrayList<>();
- if ((attributes != null) && (attributes.size() > 0)) {
- for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
- Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
- if (t != null) {
- tags.add(t);
- }
- }
- byte[] tagByteArray = Tag.fromList(tags);
- NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
- Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
- .entrySet()) {
- List<Cell> newCells = new ArrayList<>(entry.getValue().size());
- for (Cell cell : entry.getValue()) {
- // for each cell in the put add the tags
- // Assumption is that all the cells in
- // one put are the same operation
- // also, get a unique cell timestamp for non-metric cells
- // this way we don't inadvertently overwrite cell versions
- long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
- newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
- CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
- cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
- tagByteArray));
- }
- newFamilyMap.put(entry.getKey(), newCells);
- } // for each entry
- // Update the family map for the Put
- put.setFamilyCellMap(newFamilyMap);
- }
- }
-
- /**
- * Determines if the current cell's timestamp is to be used or a new unique
- * cell timestamp is to be used. The reason this is done is to inadvertently
- * overwrite cells when writes come in very fast. But for metric cells, the
- * cell timestamp signifies the metric timestamp. Hence we don't want to
- * overwrite it.
- *
- * @param timestamp
- * @param tags
- * @return cell timestamp
- */
- private long getCellTimestamp(long timestamp, List<Tag> tags) {
- // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
- // then use the generator
- if (timestamp == HConstants.LATEST_TIMESTAMP) {
- return timestampGenerator.getUniqueTimestamp();
- } else {
- return timestamp;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * Creates a {@link FlowScanner} Scan so that it can correctly process the
- * contents of {@link FlowRunTable}.
- *
- * @see
- * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
- * .hadoop.hbase.coprocessor.ObserverContext,
- * org.apache.hadoop.hbase.client.Get, java.util.List)
- */
- @Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
- Get get, List<Cell> results) throws IOException {
- Scan scan = new Scan(get);
- scan.setMaxVersions();
- RegionScanner scanner = null;
- try {
- scanner = new FlowScanner(e.getEnvironment(), scan,
- region.getScanner(scan), FlowScannerOperation.READ);
- scanner.next(results);
- e.bypass();
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * Ensures that max versions are set for the Scan so that metrics can be
- * correctly aggregated and min/max can be correctly determined.
- *
- * @see
- * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
- * .apache.hadoop.hbase.coprocessor.ObserverContext,
- * org.apache.hadoop.hbase.client.Scan,
- * org.apache.hadoop.hbase.regionserver.RegionScanner)
- */
- @Override
- public RegionScanner preScannerOpen(
- ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
- RegionScanner scanner) throws IOException {
- // set max versions for scan to see all
- // versions to aggregate for metrics
- scan.setMaxVersions();
- return scanner;
- }
-
- /*
- * (non-Javadoc)
- *
- * Creates a {@link FlowScanner} Scan so that it can correctly process the
- * contents of {@link FlowRunTable}.
- *
- * @see
- * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
- * org.apache.hadoop.hbase.coprocessor.ObserverContext,
- * org.apache.hadoop.hbase.client.Scan,
- * org.apache.hadoop.hbase.regionserver.RegionScanner)
- */
- @Override
- public RegionScanner postScannerOpen(
- ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
- RegionScanner scanner) throws IOException {
- return new FlowScanner(e.getEnvironment(), scan,
- scanner, FlowScannerOperation.READ);
- }
-
- @Override
- public InternalScanner preFlush(
- ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- InternalScanner scanner) throws IOException {
- if (LOG.isDebugEnabled()) {
- if (store != null) {
- LOG.debug("preFlush store = " + store.getColumnFamilyName()
- + " flushableSize=" + store.getFlushableSize()
- + " flushedCellsCount=" + store.getFlushedCellsCount()
- + " compactedCellsCount=" + store.getCompactedCellsCount()
- + " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
- + store.getMemStoreSize() + " size=" + store.getSize()
- + " storeFilesCount=" + store.getStorefilesCount());
- }
- }
- return new FlowScanner(c.getEnvironment(), scanner,
- FlowScannerOperation.FLUSH);
- }
-
- @Override
- public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, StoreFile resultFile) {
- if (LOG.isDebugEnabled()) {
- if (store != null) {
- LOG.debug("postFlush store = " + store.getColumnFamilyName()
- + " flushableSize=" + store.getFlushableSize()
- + " flushedCellsCount=" + store.getFlushedCellsCount()
- + " compactedCellsCount=" + store.getCompactedCellsCount()
- + " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
- + store.getMemStoreSize() + " size=" + store.getSize()
- + " storeFilesCount=" + store.getStorefilesCount());
- }
- }
- }
-
- @Override
- public InternalScanner preCompact(
- ObserverContext<RegionCoprocessorEnvironment> e, Store store,
- InternalScanner scanner, ScanType scanType, CompactionRequest request)
- throws IOException {
-
- FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
- if (request != null) {
- requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
- : FlowScannerOperation.MINOR_COMPACTION);
- LOG.info("Compactionrequest= " + request.toString() + " "
- + requestOp.toString() + " RegionName=" + e.getEnvironment()
- .getRegion().getRegionInfo().getRegionNameAsString());
- }
- return new FlowScanner(e.getEnvironment(), scanner, requestOp);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
deleted file mode 100644
index 7ce91cf..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Represents a rowkey for the flow run table.
- */
-public class FlowRunRowKey {
- private final String clusterId;
- private final String userId;
- private final String flowName;
- private final Long flowRunId;
- private final FlowRunRowKeyConverter flowRunRowKeyConverter =
- new FlowRunRowKeyConverter();
-
- public FlowRunRowKey(String clusterId, String userId, String flowName,
- Long flowRunId) {
- this.clusterId = clusterId;
- this.userId = userId;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public String getFlowName() {
- return flowName;
- }
-
- public Long getFlowRunId() {
- return flowRunId;
- }
-
- /**
- * Constructs a row key for the entity table as follows: {
- * clusterId!userId!flowName!Inverted Flow Run Id}.
- *
- * @return byte array with the row key
- */
- public byte[] getRowKey() {
- return flowRunRowKeyConverter.encode(this);
- }
-
-
- /**
- * Given the raw row key as bytes, returns the row key as an object.
- * @param rowKey Byte representation of row key.
- * @return A <cite>FlowRunRowKey</cite> object.
- */
- public static FlowRunRowKey parseRowKey(byte[] rowKey) {
- return new FlowRunRowKeyConverter().decode(rowKey);
- }
-
- /**
- * Constructs a row key for the flow run table as follows:
- * {@code clusterId!userId!flowName!Flow Run Id}.
- * @return String representation of row key
- */
- public String getRowKeyAsString() {
- return flowRunRowKeyConverter.encodeAsString(this);
- }
-
- /**
- * Given the encoded row key as string, returns the row key as an object.
- * @param encodedRowKey String representation of row key.
- * @return A <cite>FlowRunRowKey</cite> object.
- */
- public static FlowRunRowKey parseRowKeyFromString(String encodedRowKey) {
- return new FlowRunRowKeyConverter().decodeFromString(encodedRowKey);
- }
-
- /**
- * returns the Flow Key as a verbose String output.
- * @return String
- */
- @Override
- public String toString() {
- StringBuilder flowKeyStr = new StringBuilder();
- flowKeyStr.append("{clusterId=" + clusterId);
- flowKeyStr.append(" userId=" + userId);
- flowKeyStr.append(" flowName=" + flowName);
- flowKeyStr.append(" flowRunId=");
- flowKeyStr.append(flowRunId);
- flowKeyStr.append("}");
- return flowKeyStr.toString();
- }
-
- /**
- * Encodes and decodes row key for flow run table.
- * The row key is of the form : clusterId!userId!flowName!flowrunId.
- * flowrunId is a long and rest are strings.
- * <p>
- */
- final private static class FlowRunRowKeyConverter implements
- KeyConverter<FlowRunRowKey>, KeyConverterToString<FlowRunRowKey> {
-
- private FlowRunRowKeyConverter() {
- }
-
- /**
- * The flow run row key is of the form clusterId!userId!flowName!flowrunId
- * with each segment separated by !. The sizes below indicate sizes of each
- * one of these segments in sequence. clusterId, userId and flowName are
- * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
- * in size (i.e. end whenever separator is encountered). This is used while
- * decoding and helps in determining where to split.
- */
- private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };
-
- /*
- * (non-Javadoc)
- *
- * Encodes FlowRunRowKey object into a byte array with each component/field
- * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow
- * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
- * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
- * clusterId, userId and flowName are not null), this returns a row key
- * prefix of the form clusterId!userName!flowName! flowRunId is inverted
- * while encoding as it helps maintain a descending order for flow keys in
- * flow run table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#encode(java.lang.Object)
- */
- @Override
- public byte[] encode(FlowRunRowKey rowKey) {
- byte[] first =
- Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
- .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
- Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
- if (rowKey.getFlowRunId() == null) {
- return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
- } else {
- // Note that flowRunId is a long, so we can't encode them all at the
- // same
- // time.
- byte[] second =
- Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
- return Separator.QUALIFIERS.join(first, second);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an flow run row key of the form
- * clusterId!userId!flowName!flowrunId represented in byte format and
- * converts it into an FlowRunRowKey object. flowRunId is inverted while
- * decoding as it was inverted while encoding.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#decode(byte[])
- */
- @Override
- public FlowRunRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 4) {
- throw new IllegalArgumentException("the row key is not valid for "
- + "a flow run");
- }
- String clusterId =
- Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String userId =
- Separator.decode(Bytes.toString(rowKeyComponents[1]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName =
- Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long flowRunId =
- LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
- return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
- }
-
- @Override
- public String encodeAsString(FlowRunRowKey key) {
- if (key.clusterId == null || key.userId == null || key.flowName == null
- || key.flowRunId == null) {
- throw new IllegalArgumentException();
- }
- return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
- key.clusterId, key.userId, key.flowName, key.flowRunId.toString()});
- }
-
- @Override
- public FlowRunRowKey decodeFromString(String encodedRowKey) {
- List<String> split = TimelineReaderUtils.split(encodedRowKey);
- if (split == null || split.size() != 4) {
- throw new IllegalArgumentException(
- "Invalid row key for flow run table.");
- }
- Long flowRunId = Long.valueOf(split.get(3));
- return new FlowRunRowKey(split.get(0), split.get(1), split.get(2),
- flowRunId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
deleted file mode 100644
index 23ebc66..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-
-/**
- * Represents a partial rowkey (without the flowRunId) for the flow run table.
- */
-public class FlowRunRowKeyPrefix extends FlowRunRowKey implements
- RowKeyPrefix<FlowRunRowKey> {
-
- /**
- * Constructs a row key prefix for the flow run table as follows:
- * {@code clusterId!userI!flowName!}.
- *
- * @param clusterId identifying the cluster
- * @param userId identifying the user
- * @param flowName identifying the flow
- */
- public FlowRunRowKeyPrefix(String clusterId, String userId,
- String flowName) {
- super(clusterId, userId, flowName, null);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.application.
- * RowKeyPrefix#getRowKeyPrefix()
- */
- public byte[] getRowKeyPrefix() {
- // We know we're a FlowRunRowKey with null florRunId, so we can simply
- // delegate
- return super.getRowKey();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
deleted file mode 100644
index a1d32ee..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Coprocessor;
-
-/**
- * The flow run table has column family info
- * Stores per flow run information
- * aggregated across applications.
- *
- * Metrics are also stored in the info column family.
- *
- * Example flow run table record:
- *
- * <pre>
- * flow_run table
- * |-------------------------------------------|
- * | Row key | Column Family |
- * | | info |
- * |-------------------------------------------|
- * | clusterId! | flow_version:version7 |
- * | userName! | |
- * | flowName! | running_apps:1 |
- * | flowRunId | |
- * | | min_start_time:1392995080000 |
- * | | #0:"" |
- * | | |
- * | | min_start_time:1392995081012 |
- * | | #0:appId2 |
- * | | |
- * | | min_start_time:1392993083210 |
- * | | #0:appId3 |
- * | | |
- * | | |
- * | | max_end_time:1392993084018 |
- * | | #0:"" |
- * | | |
- * | | |
- * | | m!mapInputRecords:127 |
- * | | #0:"" |
- * | | |
- * | | m!mapInputRecords:31 |
- * | | #2:appId2 |
- * | | |
- * | | m!mapInputRecords:37 |
- * | | #1:appId3 |
- * | | |
- * | | |
- * | | m!mapOutputRecords:181 |
- * | | #0:"" |
- * | | |
- * | | m!mapOutputRecords:37 |
- * | | #1:appId3 |
- * | | |
- * | | |
- * |-------------------------------------------|
- * </pre>
- */
-public class FlowRunTable extends BaseTable<FlowRunTable> {
- /** entity prefix. */
- private static final String PREFIX =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
-
- /** config param name that specifies the flowrun table name. */
- public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
-
- /** default value for flowrun table name. */
- public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
-
- private static final Logger LOG =
- LoggerFactory.getLogger(FlowRunTable.class);
-
- /** default max number of versions. */
- public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
-
- public FlowRunTable() {
- super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
- * (org.apache.hadoop.hbase.client.Admin,
- * org.apache.hadoop.conf.Configuration)
- */
- public void createTable(Admin admin, Configuration hbaseConf)
- throws IOException {
-
- TableName table = getTableName(hbaseConf);
- if (admin.tableExists(table)) {
- // do not disable / delete existing table
- // similar to the approach taken by map-reduce jobs when
- // output directory exists
- throw new IOException("Table " + table.getNameAsString()
- + " already exists.");
- }
-
- HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
- HColumnDescriptor infoCF =
- new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
- infoCF.setBloomFilterType(BloomType.ROWCOL);
- flowRunTableDescp.addFamily(infoCF);
- infoCF.setMinVersions(1);
- infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
-
- // TODO: figure the split policy
- String coprocessorJarPathStr = hbaseConf.get(
- YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION,
- YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR);
-
- Path coprocessorJarPath = new Path(coprocessorJarPathStr);
- LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString());
- flowRunTableDescp.addCoprocessor(
- FlowRunCoprocessor.class.getCanonicalName(), coprocessorJarPath,
- Coprocessor.PRIORITY_USER, null);
- admin.createTable(flowRunTableDescp);
- LOG.info("Status of table creation for " + table.getNameAsString() + "="
- + admin.tableExists(table));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
deleted file mode 100644
index dbd0484..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ /dev/null
@@ -1,729 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Invoked via the coprocessor when a Get or a Scan is issued for flow run
- * table. Looks through the list of cells per row, checks their tags and does
- * operation on those cells as per the cell tags. Transforms reads of the stored
- * metrics into calculated sums for each column Also, finds the min and max for
- * start and end times in a flow run.
- */
-class FlowScanner implements RegionScanner, Closeable {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(FlowScanner.class);
-
- /**
- * use a special application id to represent the flow id this is needed since
- * TimestampGenerator parses the app id to generate a cell timestamp.
- */
- private static final String FLOW_APP_ID = "application_00000000000_0000";
-
- private final Region region;
- private final InternalScanner flowRunScanner;
- private final int batchSize;
- private final long appFinalValueRetentionThreshold;
- private RegionScanner regionScanner;
- private boolean hasMore;
- private byte[] currentRow;
- private List<Cell> availableCells = new ArrayList<>();
- private int currentIndex;
- private FlowScannerOperation action = FlowScannerOperation.READ;
-
- FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
- FlowScannerOperation action) {
- this(env, null, internalScanner, action);
- }
-
- FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
- InternalScanner internalScanner, FlowScannerOperation action) {
- this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
- // TODO initialize other scan attributes like Scan#maxResultSize
- this.flowRunScanner = internalScanner;
- if (internalScanner instanceof RegionScanner) {
- this.regionScanner = (RegionScanner) internalScanner;
- }
- this.action = action;
- if (env == null) {
- this.appFinalValueRetentionThreshold =
- YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
- this.region = null;
- } else {
- this.region = env.getRegion();
- Configuration hbaseConf = env.getConfiguration();
- this.appFinalValueRetentionThreshold = hbaseConf.getLong(
- YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
- YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(" batch size=" + batchSize);
- }
- }
-
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
- */
- @Override
- public HRegionInfo getRegionInfo() {
- return region.getRegionInfo();
- }
-
- @Override
- public boolean nextRaw(List<Cell> cells) throws IOException {
- return nextRaw(cells, ScannerContext.newBuilder().build());
- }
-
- @Override
- public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
- throws IOException {
- return nextInternal(cells, scannerContext);
- }
-
- @Override
- public boolean next(List<Cell> cells) throws IOException {
- return next(cells, ScannerContext.newBuilder().build());
- }
-
- @Override
- public boolean next(List<Cell> cells, ScannerContext scannerContext)
- throws IOException {
- return nextInternal(cells, scannerContext);
- }
-
- /**
- * Get value converter associated with a column or a column prefix. If nothing
- * matches, generic converter is returned.
- * @param colQualifierBytes
- * @return value converter implementation.
- */
- private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
- // Iterate over all the column prefixes for flow run table and get the
- // appropriate converter for the column qualifier passed if prefix matches.
- for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
- byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
- if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
- colQualifierBytes, 0, colPrefixBytes.length) == 0) {
- return colPrefix.getValueConverter();
- }
- }
- // Iterate over all the columns for flow run table and get the
- // appropriate converter for the column qualifier passed if match occurs.
- for (FlowRunColumn column : FlowRunColumn.values()) {
- if (Bytes.compareTo(
- column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
- return column.getValueConverter();
- }
- }
- // Return generic converter if nothing matches.
- return GenericConverter.getInstance();
- }
-
- /**
- * This method loops through the cells in a given row of the
- * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
- * to process the contents. It then calculates the sum or min or max for each
- * column or returns the cell as is.
- *
- * @param cells
- * @param scannerContext
- * @return true if next row is available for the scanner, false otherwise
- * @throws IOException
- */
- private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
- throws IOException {
- Cell cell = null;
- startNext();
- // Loop through all the cells in this row
- // For min/max/metrics we do need to scan the entire set of cells to get the
- // right one
- // But with flush/compaction, the number of cells being scanned will go down
- // cells are grouped per column qualifier then sorted by cell timestamp
- // (latest to oldest) per column qualifier
- // So all cells in one qualifier come one after the other before we see the
- // next column qualifier
- ByteArrayComparator comp = new ByteArrayComparator();
- byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
- AggregationOperation currentAggOp = null;
- SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
- Set<String> alreadySeenAggDim = new HashSet<>();
- int addedCnt = 0;
- long currentTimestamp = System.currentTimeMillis();
- ValueConverter converter = null;
- int limit = batchSize;
-
- while (limit <= 0 || addedCnt < limit) {
- cell = peekAtNextCell(scannerContext);
- if (cell == null) {
- break;
- }
- byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
- if (previousColumnQualifier == null) {
- // first time in loop
- previousColumnQualifier = currentColumnQualifier;
- }
-
- converter = getValueConverter(currentColumnQualifier);
- if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
- addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- converter, currentTimestamp);
- resetState(currentColumnCells, alreadySeenAggDim);
- previousColumnQualifier = currentColumnQualifier;
- currentAggOp = getCurrentAggOp(cell);
- converter = getValueConverter(currentColumnQualifier);
- }
- collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
- converter, scannerContext);
- nextCell(scannerContext);
- }
- if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
- addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
- currentTimestamp);
- if (LOG.isDebugEnabled()) {
- if (addedCnt > 0) {
- LOG.debug("emitted cells. " + addedCnt + " for " + this.action
- + " rowKey="
- + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
- } else {
- LOG.debug("emitted no cells for " + this.action);
- }
- }
- }
- return hasMore();
- }
-
- private AggregationOperation getCurrentAggOp(Cell cell) {
- List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
- cell.getTagsLength());
- // We assume that all the operations for a particular column are the same
- return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
- }
-
- /**
- * resets the parameters to an initialized state for next loop iteration.
- *
- * @param cell
- * @param currentAggOp
- * @param currentColumnCells
- * @param alreadySeenAggDim
- * @param collectedButNotEmitted
- */
- private void resetState(SortedSet<Cell> currentColumnCells,
- Set<String> alreadySeenAggDim) {
- currentColumnCells.clear();
- alreadySeenAggDim.clear();
- }
-
- private void collectCells(SortedSet<Cell> currentColumnCells,
- AggregationOperation currentAggOp, Cell cell,
- Set<String> alreadySeenAggDim, ValueConverter converter,
- ScannerContext scannerContext) throws IOException {
-
- if (currentAggOp == null) {
- // not a min/max/metric cell, so just return it as is
- currentColumnCells.add(cell);
- return;
- }
-
- switch (currentAggOp) {
- case GLOBAL_MIN:
- if (currentColumnCells.size() == 0) {
- currentColumnCells.add(cell);
- } else {
- Cell currentMinCell = currentColumnCells.first();
- Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
- (NumericValueConverter) converter);
- if (!currentMinCell.equals(newMinCell)) {
- currentColumnCells.remove(currentMinCell);
- currentColumnCells.add(newMinCell);
- }
- }
- break;
- case GLOBAL_MAX:
- if (currentColumnCells.size() == 0) {
- currentColumnCells.add(cell);
- } else {
- Cell currentMaxCell = currentColumnCells.first();
- Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
- (NumericValueConverter) converter);
- if (!currentMaxCell.equals(newMaxCell)) {
- currentColumnCells.remove(currentMaxCell);
- currentColumnCells.add(newMaxCell);
- }
- }
- break;
- case SUM:
- case SUM_FINAL:
- if (LOG.isTraceEnabled()) {
- LOG.trace("In collect cells "
- + " FlowSannerOperation="
- + this.action
- + " currentAggOp="
- + currentAggOp
- + " cell qualifier="
- + Bytes.toString(CellUtil.cloneQualifier(cell))
- + " cell value= "
- + converter.decodeValue(CellUtil.cloneValue(cell))
- + " timestamp=" + cell.getTimestamp());
- }
-
- // only if this app has not been seen yet, add to current column cells
- List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
- cell.getTagsLength());
- String aggDim = HBaseTimelineStorageUtils
- .getAggregationCompactionDimension(tags);
- if (!alreadySeenAggDim.contains(aggDim)) {
- // if this agg dimension has already been seen,
- // since they show up in sorted order
- // we drop the rest which are older
- // in other words, this cell is older than previously seen cells
- // for that agg dim
- // but when this agg dim is not seen,
- // consider this cell in our working set
- currentColumnCells.add(cell);
- alreadySeenAggDim.add(aggDim);
- }
- break;
- default:
- break;
- } // end of switch case
- }
-
- /*
- * Processes the cells in input param currentColumnCells and populates
- * List<Cell> cells as the output based on the input AggregationOperation
- * parameter.
- */
- private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
- AggregationOperation currentAggOp, ValueConverter converter,
- long currentTimestamp) throws IOException {
- if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
- return 0;
- }
- if (currentAggOp == null) {
- cells.addAll(currentColumnCells);
- return currentColumnCells.size();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
- + currentColumnCells.size() + " currentAggOp" + currentAggOp);
- }
-
- switch (currentAggOp) {
- case GLOBAL_MIN:
- case GLOBAL_MAX:
- cells.addAll(currentColumnCells);
- return currentColumnCells.size();
- case SUM:
- case SUM_FINAL:
- switch (action) {
- case FLUSH:
- case MINOR_COMPACTION:
- cells.addAll(currentColumnCells);
- return currentColumnCells.size();
- case READ:
- Cell sumCell = processSummation(currentColumnCells,
- (NumericValueConverter) converter);
- cells.add(sumCell);
- return 1;
- case MAJOR_COMPACTION:
- List<Cell> finalCells = processSummationMajorCompaction(
- currentColumnCells, (NumericValueConverter) converter,
- currentTimestamp);
- cells.addAll(finalCells);
- return finalCells.size();
- default:
- cells.addAll(currentColumnCells);
- return currentColumnCells.size();
- }
- default:
- cells.addAll(currentColumnCells);
- return currentColumnCells.size();
- }
- }
-
- /*
- * Returns a cell whose value is the sum of all cell values in the input set.
- * The new cell created has the timestamp of the most recent metric cell. The
- * sum of a metric for a flow run is the summation at the point of the last
- * metric update in that flow till that time.
- */
- private Cell processSummation(SortedSet<Cell> currentColumnCells,
- NumericValueConverter converter) throws IOException {
- Number sum = 0;
- Number currentValue = 0;
- long ts = 0L;
- long mostCurrentTimestamp = 0L;
- Cell mostRecentCell = null;
- for (Cell cell : currentColumnCells) {
- currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
- ts = cell.getTimestamp();
- if (mostCurrentTimestamp < ts) {
- mostCurrentTimestamp = ts;
- mostRecentCell = cell;
- }
- sum = converter.add(sum, currentValue);
- }
- byte[] sumBytes = converter.encodeValue(sum);
- Cell sumCell =
- HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
- return sumCell;
- }
-
-
- /**
- * Returns a list of cells that contains
- *
- * A) the latest cells for applications that haven't finished yet
- * B) summation
- * for the flow, based on applications that have completed and are older than
- * a certain time
- *
- * The new cell created has the timestamp of the most recent metric cell. The
- * sum of a metric for a flow run is the summation at the point of the last
- * metric update in that flow till that time.
- */
- @VisibleForTesting
- List<Cell> processSummationMajorCompaction(
- SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
- long currentTimestamp)
- throws IOException {
- Number sum = 0;
- Number currentValue = 0;
- long ts = 0L;
- boolean summationDone = false;
- List<Cell> finalCells = new ArrayList<Cell>();
- if (currentColumnCells == null) {
- return finalCells;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("In processSummationMajorCompaction,"
- + " will drop cells older than " + currentTimestamp
- + " CurrentColumnCells size=" + currentColumnCells.size());
- }
-
- for (Cell cell : currentColumnCells) {
- AggregationOperation cellAggOp = getCurrentAggOp(cell);
- // if this is the existing flow sum cell
- List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
- cell.getTagsLength());
- String appId = HBaseTimelineStorageUtils
- .getAggregationCompactionDimension(tags);
- if (appId == FLOW_APP_ID) {
- sum = converter.add(sum, currentValue);
- summationDone = true;
- if (LOG.isTraceEnabled()) {
- LOG.trace("reading flow app id sum=" + sum);
- }
- } else {
- currentValue = (Number) converter.decodeValue(CellUtil
- .cloneValue(cell));
- // read the timestamp truncated by the generator
- ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
- if ((cellAggOp == AggregationOperation.SUM_FINAL)
- && ((ts + this.appFinalValueRetentionThreshold)
- < currentTimestamp)) {
- sum = converter.add(sum, currentValue);
- summationDone = true;
- if (LOG.isTraceEnabled()) {
- LOG.trace("MAJOR COMPACTION loop sum= " + sum
- + " discarding now: " + " qualifier="
- + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
- + converter.decodeValue(CellUtil.cloneValue(cell))
- + " timestamp=" + cell.getTimestamp() + " " + this.action);
- }
- } else {
- // not a final value but it's the latest cell for this app
- // so include this cell in the list of cells to write back
- finalCells.add(cell);
- }
- }
- }
- if (summationDone) {
- Cell anyCell = currentColumnCells.first();
- List<Tag> tags = new ArrayList<Tag>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
- Bytes.toBytes(FLOW_APP_ID));
- tags.add(t);
- t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
- Bytes.toBytes(FLOW_APP_ID));
- tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
- Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
- CellUtil.cloneRow(anyCell),
- CellUtil.cloneFamily(anyCell),
- CellUtil.cloneQualifier(anyCell),
- TimestampGenerator.getSupplementedTimestamp(
- System.currentTimeMillis(), FLOW_APP_ID),
- converter.encodeValue(sum), tagByteArray);
- finalCells.add(sumCell);
- if (LOG.isTraceEnabled()) {
- LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
- + Bytes.toString(CellUtil.cloneQualifier(sumCell))
- + " " + this.action);
- }
- LOG.info("After major compaction for qualifier="
- + Bytes.toString(CellUtil.cloneQualifier(sumCell))
- + " with currentColumnCells.size="
- + currentColumnCells.size()
- + " returning finalCells.size=" + finalCells.size()
- + " with sum=" + sum.longValue()
- + " with cell timestamp " + sumCell.getTimestamp());
- } else {
- String qualifier = "";
- LOG.info("After major compaction for qualifier=" + qualifier
- + " with currentColumnCells.size="
- + currentColumnCells.size()
- + " returning finalCells.size=" + finalCells.size()
- + " with zero sum="
- + sum.longValue());
- }
- return finalCells;
- }
-
- /**
- * Determines which cell is to be returned based on the values in each cell
- * and the comparison operation MIN or MAX.
- *
- * @param previouslyChosenCell
- * @param currentCell
- * @param currentAggOp
- * @return the cell which is the min (or max) cell
- * @throws IOException
- */
- private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
- AggregationOperation currentAggOp, NumericValueConverter converter)
- throws IOException {
- if (previouslyChosenCell == null) {
- return currentCell;
- }
- try {
- Number previouslyChosenCellValue = (Number)converter.decodeValue(
- CellUtil.cloneValue(previouslyChosenCell));
- Number currentCellValue = (Number) converter.decodeValue(CellUtil
- .cloneValue(currentCell));
- switch (currentAggOp) {
- case GLOBAL_MIN:
- if (converter.compare(
- currentCellValue, previouslyChosenCellValue) < 0) {
- // new value is minimum, hence return this cell
- return currentCell;
- } else {
- // previously chosen value is miniumum, hence return previous min cell
- return previouslyChosenCell;
- }
- case GLOBAL_MAX:
- if (converter.compare(
- currentCellValue, previouslyChosenCellValue) > 0) {
- // new value is max, hence return this cell
- return currentCell;
- } else {
- // previously chosen value is max, hence return previous max cell
- return previouslyChosenCell;
- }
- default:
- return currentCell;
- }
- } catch (IllegalArgumentException iae) {
- LOG.error("caught iae during conversion to long ", iae);
- return currentCell;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (flowRunScanner != null) {
- flowRunScanner.close();
- } else {
- LOG.warn("scanner close called but scanner is null");
- }
- }
-
- /**
- * Called to signal the start of the next() call by the scanner.
- */
- public void startNext() {
- currentRow = null;
- }
-
- /**
- * Returns whether or not the underlying scanner has more rows.
- */
- public boolean hasMore() {
- return currentIndex < availableCells.size() ? true : hasMore;
- }
-
- /**
- * Returns the next available cell for the current row and advances the
- * pointer to the next cell. This method can be called multiple times in a row
- * to advance through all the available cells.
- *
- * @param scannerContext
- * context information for the batch of cells under consideration
- * @return the next available cell or null if no more cells are available for
- * the current row
- * @throws IOException
- */
- public Cell nextCell(ScannerContext scannerContext) throws IOException {
- Cell cell = peekAtNextCell(scannerContext);
- if (cell != null) {
- currentIndex++;
- }
- return cell;
- }
-
- /**
- * Returns the next available cell for the current row, without advancing the
- * pointer. Calling this method multiple times in a row will continue to
- * return the same cell.
- *
- * @param scannerContext
- * context information for the batch of cells under consideration
- * @return the next available cell or null if no more cells are available for
- * the current row
- * @throws IOException if any problem is encountered while grabbing the next
- * cell.
- */
- public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
- if (currentIndex >= availableCells.size()) {
- // done with current batch
- availableCells.clear();
- currentIndex = 0;
- hasMore = flowRunScanner.next(availableCells, scannerContext);
- }
- Cell cell = null;
- if (currentIndex < availableCells.size()) {
- cell = availableCells.get(currentIndex);
- if (currentRow == null) {
- currentRow = CellUtil.cloneRow(cell);
- } else if (!CellUtil.matchingRow(cell, currentRow)) {
- // moved on to the next row
- // don't use the current cell
- // also signal no more cells for this row
- return null;
- }
- }
- return cell;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
- */
- @Override
- public long getMaxResultSize() {
- if (regionScanner == null) {
- throw new IllegalStateException(
- "RegionScanner.isFilterDone() called when the flow "
- + "scanner's scanner is not a RegionScanner");
- }
- return regionScanner.getMaxResultSize();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
- */
- @Override
- public long getMvccReadPoint() {
- if (regionScanner == null) {
- throw new IllegalStateException(
- "RegionScanner.isFilterDone() called when the flow "
- + "scanner's internal scanner is not a RegionScanner");
- }
- return regionScanner.getMvccReadPoint();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
- */
- @Override
- public boolean isFilterDone() throws IOException {
- if (regionScanner == null) {
- throw new IllegalStateException(
- "RegionScanner.isFilterDone() called when the flow "
- + "scanner's internal scanner is not a RegionScanner");
- }
- return regionScanner.isFilterDone();
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
- */
- @Override
- public boolean reseek(byte[] bytes) throws IOException {
- if (regionScanner == null) {
- throw new IllegalStateException(
- "RegionScanner.reseek() called when the flow "
- + "scanner's internal scanner is not a RegionScanner");
- }
- return regionScanner.reseek(bytes);
- }
-
- @Override
- public int getBatch() {
- return batchSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
deleted file mode 100644
index 73c666f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-
-/**
- * Identifies the scanner operation on the {@link FlowRunTable}.
- */
-public enum FlowScannerOperation {
-
- /**
- * If the scanner is opened for reading
- * during preGet or preScan.
- */
- READ,
-
- /**
- * If the scanner is opened during preFlush.
- */
- FLUSH,
-
- /**
- * If the scanner is opened during minor Compaction.
- */
- MINOR_COMPACTION,
-
- /**
- * If the scanner is opened during major Compaction.
- */
- MAJOR_COMPACTION
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
deleted file mode 100644
index 04963f3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
- * contains classes related to implementation for flow related tables, viz. flow
- * run table and flow activity table.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
deleted file mode 100644
index e78db2a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.server.timelineservice.storage contains
- * classes which define and implement reading and writing to backend storage.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
deleted file mode 100644
index 5bacf66..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-/**
- * The base class for reading timeline data from the HBase storage. This class
- * provides basic support to validate and augment reader context.
- */
-public abstract class AbstractTimelineStorageReader {
-
- private final TimelineReaderContext context;
- /**
- * Used to look up the flow context.
- */
- private final AppToFlowTable appToFlowTable = new AppToFlowTable();
-
- public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
- context = ctxt;
- }
-
- protected TimelineReaderContext getContext() {
- return context;
- }
-
- /**
- * Looks up flow context from AppToFlow table.
- *
- * @param appToFlowRowKey to identify Cluster and App Ids.
- * @param clusterId the cluster id.
- * @param hbaseConf HBase configuration.
- * @param conn HBase Connection.
- * @return flow context information.
- * @throws IOException if any problem occurs while fetching flow information.
- */
- protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
- String clusterId, Configuration hbaseConf, Connection conn)
- throws IOException {
- byte[] rowKey = appToFlowRowKey.getRowKey();
- Get get = new Get(rowKey);
- Result result = appToFlowTable.getResult(hbaseConf, conn, get);
- if (result != null && !result.isEmpty()) {
- Object flowName =
- AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId);
- Object flowRunId =
- AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId);
- Object userId =
- AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId);
- if (flowName == null || userId == null || flowRunId == null) {
- throw new NotFoundException(
- "Unable to find the context flow name, and flow run id, "
- + "and user id for clusterId=" + clusterId
- + ", appId=" + appToFlowRowKey.getAppId());
- }
- return new FlowContext((String)userId, (String)flowName,
- ((Number)flowRunId).longValue());
- } else {
- throw new NotFoundException(
- "Unable to find the context flow name, and flow run id, "
- + "and user id for clusterId=" + clusterId
- + ", appId=" + appToFlowRowKey.getAppId());
- }
- }
-
- /**
- * Sets certain parameters to defaults if the values are not provided.
- *
- * @param hbaseConf HBase Configuration.
- * @param conn HBase Connection.
- * @throws IOException if any exception is encountered while setting params.
- */
- protected void augmentParams(Configuration hbaseConf, Connection conn)
- throws IOException {
- defaultAugmentParams(hbaseConf, conn);
- }
-
- /**
- * Default behavior for all timeline readers to augment parameters.
- *
- * @param hbaseConf HBase Configuration.
- * @param conn HBase Connection.
- * @throws IOException if any exception is encountered while setting params.
- */
- final protected void defaultAugmentParams(Configuration hbaseConf,
- Connection conn) throws IOException {
- // In reality all three should be null or neither should be null
- if (context.getFlowName() == null || context.getFlowRunId() == null
- || context.getUserId() == null) {
- // Get flow context information from AppToFlow table.
- AppToFlowRowKey appToFlowRowKey =
- new AppToFlowRowKey(context.getAppId());
- FlowContext flowContext =
- lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
- conn);
- context.setFlowName(flowContext.flowName);
- context.setFlowRunId(flowContext.flowRunId);
- context.setUserId(flowContext.userId);
- }
- }
-
- /**
- * Validates the required parameters to read the entities.
- */
- protected abstract void validateParams();
-
- /**
- * Encapsulates flow context information.
- */
- protected static class FlowContext {
- private final String userId;
- private final String flowName;
- private final Long flowRunId;
-
- public FlowContext(String user, String flowName, Long flowRunId) {
- this.userId = user;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- }
-
- protected String getUserId() {
- return userId;
- }
-
- protected String getFlowName() {
- return flowName;
- }
-
- protected Long getFlowRunId() {
- return flowRunId;
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org