You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/08/03 22:25:30 UTC
svn commit: r1153645 [2/3] - in
/hbase/trunk/src/main/java/org/apache/hadoop/hbase: io/ io/hfile/ util/
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,1299 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Provides functionality to write ({@link BlockIndexWriter}) and read
+ * ({@link BlockIndexReader}) single-level and multi-level block indexes.
+ *
+ * Examples of how to use the block index writer can be found in
+ * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
+ * to use the reader can be found in {@link HFileReaderV2} and
+ * {@link TestHFileBlockIndex}.
+ */
+public class HFileBlockIndex {
+
+ private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
+
+ static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
+
+ /**
+ * The maximum size guideline for index blocks (both leaf, intermediate, and
+ * root). If not specified, {@link #DEFAULT_MAX_CHUNK_SIZE} is used.
+ */
+ public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
+
+ /**
+ * The number of bytes stored in each "secondary index" entry in addition to
+ * key bytes in the non-root index block format. The first long is the file
+ * offset of the deeper-level block the entry points to, and the int that
+ * follows is that block's on-disk size without including header.
+ */
+ static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
+ + Bytes.SIZEOF_LONG;
+
+ /**
+ * Error message when trying to use inline block API in single-level mode.
+ */
+ private static final String INLINE_BLOCKS_NOT_ALLOWED =
+ "Inline blocks are not allowed in the single-level-only mode";
+
+ /**
+ * Configuration key to cache leaf- and intermediate-level index blocks on
+ * write.
+ */
+ public static final String CACHE_INDEX_BLOCKS_ON_WRITE_KEY =
+ "hfile.block.index.cacheonwrite";
+
+ /**
+ * The size of a meta-data record used for finding the mid-key in a
+ * multi-level index. Consists of the middle leaf-level index block offset
+ * (long), its on-disk size without header included (int), and the mid-key
+ * entry's zero-based index in that leaf index block.
+ */
+ private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
+ 2 * Bytes.SIZEOF_INT;
+
+ /**
+ * The reader will always hold the root level index in the memory. Index
+ * blocks at all other levels will be cached in the LRU cache in practice,
+ * although this API does not enforce that.
+ *
+ * All non-root (leaf and intermediate) index blocks contain what we call a
+ * "secondary index": an array of offsets to the entries within the block.
+ * This allows us to do binary search for the entry corresponding to the
+ * given key without having to deserialize the block.
+ */
+ public static class BlockIndexReader implements HeapSize {
+ /** Needed doing lookup on blocks. */
+ private final RawComparator<byte[]> comparator;
+
+ // Root-level data.
+ private byte[][] blockKeys;
+ private long[] blockOffsets;
+ private int[] blockDataSizes;
+ private int rootByteSize = 0;
+ private int rootCount = 0;
+
+ // Mid-key metadata.
+ private long midLeafBlockOffset = -1;
+ private int midLeafBlockOnDiskSize = -1;
+ private int midKeyEntry = -1;
+
+ /** Pre-computed mid-key */
+ private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
+
+ /**
+ * The number of levels in the block index tree. One if there is only root
+ * level, two for root and leaf levels, etc.
+ */
+ private int searchTreeLevel;
+
+ /** A way to read {@link HFile} blocks at a given offset */
+ private HFileBlock.BasicReader blockReader;
+
+ public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel,
+ final HFileBlock.BasicReader blockReader) {
+ this(c, treeLevel);
+ this.blockReader = blockReader;
+ }
+
+ public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel)
+ {
+ comparator = c;
+ searchTreeLevel = treeLevel;
+ }
+
+ /**
+ * @return true if the block index is empty.
+ */
+ public boolean isEmpty() {
+ return blockKeys.length == 0;
+ }
+
+ /**
+ * Verifies that the block index is non-empty and throws an
+ * {@link IllegalStateException} otherwise.
+ */
+ public void ensureNonEmpty() {
+ if (blockKeys.length == 0) {
+ throw new IllegalStateException("Block index is empty or not loaded");
+ }
+ }
+
+ /**
+ * Return the data block which contains this key. This function will only
+ * be called when the HFile version is larger than 1.
+ *
+ * @param key the key we are looking for
+ * @param keyOffset the offset of the key in its byte array
+ * @param keyLength the length of the key
+ * @param currentBlock the current block, to avoid re-reading the same
+ * block
+ * @return reader a basic way to load blocks
+ * @throws IOException
+ */
+ public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
+ int keyLength, HFileBlock currentBlock)
+ throws IOException {
+ int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
+ if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
+ return null;
+ }
+
+ // Read the next-level (intermediate or leaf) index block.
+ long currentOffset = blockOffsets[rootLevelIndex];
+ int currentOnDiskSize = blockDataSizes[rootLevelIndex];
+
+ int lookupLevel = 1; // How many levels deep we are in our lookup.
+ HFileBlock block = blockReader.readBlockData(currentOffset,
+ currentOnDiskSize, -1, true);
+ if (block == null) {
+ throw new IOException("Failed to read block at offset " +
+ currentOffset + ", onDiskSize=" + currentOnDiskSize);
+ }
+ while (!block.getBlockType().equals(BlockType.DATA)) {
+ // Read the block. It may be intermediate level block, leaf level block
+ // or data block. In any case, we expect non-root index block format.
+
+ // We don't allow more than searchTreeLevel iterations of this loop.
+ if (++lookupLevel > searchTreeLevel) {
+ throw new IOException("Search Tree Level overflow: lookupLevel: "+
+ lookupLevel + " searchTreeLevel: " + searchTreeLevel);
+ }
+
+ // read to the byte buffer
+ ByteBuffer buffer = block.getBufferWithoutHeader();
+ if (!locateNonRootIndexEntry(buffer, key, keyOffset, keyLength,
+ comparator)) {
+ throw new IOException("The key "
+ + Bytes.toStringBinary(key, keyOffset, keyLength)
+ + " is before the" + " first key of the non-root index block "
+ + block);
+ }
+
+ currentOffset = buffer.getLong();
+ currentOnDiskSize = buffer.getInt();
+
+ // Located a deeper-level block, now read it.
+ if (currentBlock != null && currentBlock.getOffset() == currentOffset)
+ {
+ // Avoid reading the same block.
+ block = currentBlock;
+ } else {
+ block = blockReader.readBlockData(currentOffset, currentOnDiskSize,
+ -1, true);
+ }
+ }
+
+ if (lookupLevel != searchTreeLevel) {
+ throw new IOException("Reached a data block at level " + lookupLevel +
+ " but the number of levels is " + searchTreeLevel);
+ }
+
+ return block;
+ }
+
+ /**
+ * An approximation to the {@link HFile}'s mid-key. Operates on block
+ * boundaries, and does not go inside blocks. In other words, returns the
+ * first key of the middle block of the file.
+ *
+ * @return the first key of the middle block
+ */
+ public byte[] midkey() throws IOException {
+ if (rootCount == 0)
+ throw new IOException("HFile empty");
+
+ byte[] midKey = this.midKey.get();
+ if (midKey != null)
+ return midKey;
+
+ if (midLeafBlockOffset >= 0) {
+ if (blockReader == null) {
+ throw new IOException("Have to read the middle leaf block but " +
+ "no block reader available");
+ }
+ HFileBlock midLeafBlock = blockReader.readBlockData(
+ midLeafBlockOffset, midLeafBlockOnDiskSize, -1, true);
+ ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
+ int numDataBlocks = b.getInt();
+ int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
+ int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
+ keyRelOffset;
+ int keyOffset = b.arrayOffset() +
+ Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
+ SECONDARY_INDEX_ENTRY_OVERHEAD;
+ midKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
+ } else {
+ // The middle of the root-level index.
+ midKey = blockKeys[(rootCount - 1) / 2];
+ }
+
+ this.midKey.set(midKey);
+ return midKey;
+ }
+
+ /**
+ * @param i from 0 to {@link #getRootBlockCount() - 1}
+ */
+ public byte[] getRootBlockKey(int i) {
+ return blockKeys[i];
+ }
+
+ /**
+ * @param i from 0 to {@link #getRootBlockCount() - 1}
+ */
+ public long getRootBlockOffset(int i) {
+ return blockOffsets[i];
+ }
+
+ /**
+ * @param i zero-based index of a root-level block
+ * @return the on-disk size of the root-level block for version 2, or the
+ * uncompressed size for version 1
+ */
+ public int getRootBlockDataSize(int i) {
+ return blockDataSizes[i];
+ }
+
+ /**
+ * @return the number of root-level blocks in this block index
+ */
+ public int getRootBlockCount() {
+ return rootCount;
+ }
+
+ /**
+ * Finds the root-level index block containing the given key.
+ *
+ * @param key
+ * Key to find
+ * @return Offset of block containing <code>key</code> (between 0 and the
+ * number of blocks - 1) or -1 if this file does not contain the
+ * request.
+ */
+ public int rootBlockContainingKey(final byte[] key, int offset,
+ int length) {
+ int pos = Bytes.binarySearch(blockKeys, key, offset, length,
+ comparator);
+ // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
+ // binarySearch's javadoc.
+
+ if (pos >= 0) {
+ // This means this is an exact match with an element of blockKeys.
+ assert pos < blockKeys.length;
+ return pos;
+ }
+
+ // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
+ // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
+ // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
+ // key < blockKeys[0], meaning the file does not contain the given key.
+
+ int i = -pos - 1;
+ assert 0 <= i && i <= blockKeys.length;
+ return i - 1;
+ }
+
+ /**
+ * Adds a new entry in the root block index. Only used when reading.
+ *
+ * @param key Last key in the block
+ * @param offset file offset where the block is stored
+ * @param dataSize the uncompressed data size
+ */
+ private void add(final byte[] key, final long offset, final int dataSize) {
+ blockOffsets[rootCount] = offset;
+ blockKeys[rootCount] = key;
+ blockDataSizes[rootCount] = dataSize;
+
+ rootCount++;
+ rootByteSize += SECONDARY_INDEX_ENTRY_OVERHEAD + key.length;
+ }
+
+ /**
+ * Performs a binary search over a non-root level index block. Utilizes the
+ * secondary index, which records the offsets of (offset, onDiskSize,
+ * firstKey) tuples of all entries.
+ *
+ * @param key the key we are searching for offsets to individual entries in
+ * the blockIndex buffer
+ * @param keyOffset the offset of the key in its byte array
+ * @param keyLength the length of the key
+ * @param nonRootIndex the non-root index block buffer, starting with the
+ * secondary index. The position is ignored.
+ * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
+ * keys[i + 1], if keys is the array of all keys being searched, or
+ * -1 otherwise
+ * @throws IOException
+ */
+ static int binarySearchNonRootIndex(byte[] key, int keyOffset,
+ int keyLength, ByteBuffer nonRootIndex,
+ RawComparator<byte[]> comparator) {
+
+ int numEntries = nonRootIndex.getInt(0);
+ int low = 0;
+ int high = numEntries - 1;
+ int mid = 0;
+
+ // Entries start after the number of entries and the secondary index.
+ // The secondary index takes numEntries + 1 ints.
+ int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+ // If we imagine that keys[-1] = -Infinity and
+ // keys[numEntries] = Infinity, then we are maintaining an invariant that
+ // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
+
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+
+ // Midkey's offset relative to the end of secondary index
+ int midKeyRelOffset = nonRootIndex.getInt(
+ Bytes.SIZEOF_INT * (mid + 1));
+
+ // The offset of the middle key in the blockIndex buffer
+ int midKeyOffset = entriesOffset // Skip secondary index
+ + midKeyRelOffset // Skip all entries until mid
+ + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
+
+ // We subtract the two consecutive secondary index elements, which
+ // gives us the size of the whole (offset, onDiskSize, key) tuple. We
+ // then need to subtract the overhead of offset and onDiskSize.
+ int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
+ midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
+
+ // we have to compare in this order, because the comparator order
+ // has special logic when the 'left side' is a special key.
+ int cmp = comparator.compare(key, keyOffset, keyLength,
+ nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
+ midLength);
+
+ // key lives above the midpoint
+ if (cmp > 0)
+ low = mid + 1; // Maintain the invariant that keys[low - 1] < key
+ // key lives below the midpoint
+ else if (cmp < 0)
+ high = mid - 1; // Maintain the invariant that key < keys[high + 1]
+ else
+ return mid; // exact match
+ }
+
+ // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
+ // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
+ // condition, low >= high + 1. Therefore, low = high + 1.
+
+ if (low != high + 1) {
+ throw new IllegalStateException("Binary search broken: low=" + low
+ + " " + "instead of " + (high + 1));
+ }
+
+ // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
+ // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
+ int i = low - 1;
+
+ // Some extra validation on the result.
+ if (i < -1 || i >= numEntries) {
+ throw new IllegalStateException("Binary search broken: result is " +
+ i + " but expected to be between -1 and (numEntries - 1) = " +
+ (numEntries - 1));
+ }
+
+ return i;
+ }
+
+ /**
+ * Search for one key using the secondary index in a non-root block. In case
+ * of success, positions the provided buffer at the entry of interest, where
+ * the file offset and the on-disk-size can be read.
+ *
+ * @param nonRootBlock a non-root block without header. Initial position
+ * does not matter.
+ * @param key the byte array containing the key
+ * @param keyOffset the offset of the key in its byte array
+ * @param keyLength the length of the key
+ * @return true in the case the index entry containing the given key was
+ * found, false in the case the given key is before the first key
+ *
+ */
+ static boolean locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
+ int keyOffset, int keyLength, RawComparator<byte[]> comparator) {
+ int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
+ nonRootBlock, comparator);
+
+ if (entryIndex == -1) {
+ return false;
+ }
+
+ int numEntries = nonRootBlock.getInt(0);
+
+ // The end of secondary index and the beginning of entries themselves.
+ int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+ // The offset of the entry we are interested in relative to the end of
+ // the secondary index.
+ int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
+ * (1 + entryIndex));
+
+ nonRootBlock.position(entriesOffset + entryRelOffset);
+ return true;
+ }
+
+ /**
+ * Read in the root-level index from the given input stream. Must match
+ * what was written into the root level by
+ * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
+ * offset that function returned.
+ *
+ * @param in the buffered input stream or wrapped byte input stream
+ * @param numEntries the number of root-level index entries
+ * @throws IOException
+ */
+ public void readRootIndex(DataInput in, final int numEntries)
+ throws IOException {
+ blockOffsets = new long[numEntries];
+ blockKeys = new byte[numEntries][];
+ blockDataSizes = new int[numEntries];
+
+ // If index size is zero, no index was written.
+ if (numEntries > 0) {
+ for (int i = 0; i < numEntries; ++i) {
+ long offset = in.readLong();
+ int dataSize = in.readInt();
+ byte[] key = Bytes.readByteArray(in);
+ add(key, offset, dataSize);
+ }
+ }
+ }
+
+ /**
+ * Read the root-level metadata of a multi-level block index. Based on
+ * {@link #readRootIndex(DataInput, int)}, but also reads metadata
+ * necessary to compute the mid-key in a multi-level index.
+ *
+ * @param in the buffered or byte input stream to read from
+ * @param numEntries the number of root-level index entries
+ * @throws IOException
+ */
+ public void readMultiLevelIndexRoot(DataInputStream in,
+ final int numEntries) throws IOException {
+ readRootIndex(in, numEntries);
+ if (in.available() < MID_KEY_METADATA_SIZE) {
+ // No mid-key metadata available.
+ return;
+ }
+
+ midLeafBlockOffset = in.readLong();
+ midLeafBlockOnDiskSize = in.readInt();
+ midKeyEntry = in.readInt();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("size=" + rootCount).append("\n");
+ for (int i = 0; i < rootCount; i++) {
+ sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
+ .append("\n offset=").append(blockOffsets[i])
+ .append(", dataSize=" + blockDataSizes[i]).append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public long heapSize() {
+ long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
+ 3 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
+
+ // Mid-key metadata.
+ heapSize += MID_KEY_METADATA_SIZE;
+
+ // Calculating the size of blockKeys
+ if (blockKeys != null) {
+ // Adding array + references overhead
+ heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
+ * ClassSize.REFERENCE);
+
+ // Adding bytes
+ for (byte[] key : blockKeys) {
+ heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
+ }
+ }
+
+ if (blockOffsets != null) {
+ heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
+ * Bytes.SIZEOF_LONG);
+ }
+
+ if (blockDataSizes != null) {
+ heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
+ * Bytes.SIZEOF_INT);
+ }
+
+ return ClassSize.align(heapSize);
+ }
+
+ }
+
+ /**
+ * Writes the block index into the output stream. Generate the tree from
+ * bottom up. The leaf level is written to disk as a sequence of inline
+ * blocks, if it is larger than a certain number of bytes. If the leaf level
+ * is not large enough, we write all entries to the root level instead.
+ *
+ * After all leaf blocks have been written, we end up with an index
+ * referencing the resulting leaf index blocks. If that index is larger than
+ * the allowed root index size, the writer will break it up into
+ * reasonable-size intermediate-level index block chunks write those chunks
+ * out, and create another index referencing those chunks. This will be
+ * repeated until the remaining index is small enough to become the root
+ * index. However, in most practical cases we will only have leaf-level
+ * blocks and the root index, or just the root index.
+ */
+ public static class BlockIndexWriter implements InlineBlockWriter {
+ /**
+ * While the index is being written, this represents the current block
+ * index referencing all leaf blocks, with one exception. If the file is
+ * being closed and there are not enough blocks to complete even a single
+ * leaf block, no leaf blocks get written and this contains the entire
+ * block index. After all levels of the index were written by
+ * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
+ * root-level index.
+ */
+ private BlockIndexChunk rootChunk = new BlockIndexChunk();
+
+ /**
+ * Current leaf-level chunk. New entries referencing data blocks get added
+ * to this chunk until it grows large enough to be written to disk.
+ */
+ private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
+
+ /**
+ * The number of block index levels. This is one if there is only root
+ * level (even empty), two if there a leaf level and root level, and is
+ * higher if there are intermediate levels. This is only final after
+ * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
+ * initial value accounts for the root level, and will be increased to two
+ * as soon as we find out there is a leaf-level in
+ * {@link #blockWritten(long, int)}.
+ */
+ private int numLevels = 1;
+
+ private HFileBlock.Writer blockWriter;
+ private byte[] firstKey = null;
+
+ /**
+ * The total number of leaf-level entries, i.e. entries referenced by
+ * leaf-level blocks. For the data block index this is equal to the number
+ * of data blocks.
+ */
+ private long totalNumEntries;
+
+ /** Total compressed size of all index blocks. */
+ private long totalBlockOnDiskSize;
+
+ /** Total uncompressed size of all index blocks. */
+ private long totalBlockUncompressedSize;
+
+ /** The maximum size guideline of all multi-level index blocks. */
+ private int maxChunkSize;
+
+ /** Whether we require this block index to always be single-level. */
+ private boolean singleLevelOnly;
+
+ /** Block cache, or null if cache-on-write is disabled */
+ private BlockCache blockCache;
+
+ /** Name to use for computing cache keys */
+ private String nameForCaching;
+
+ /** Creates a single-level block index writer */
+ public BlockIndexWriter() {
+ this(null, null, null);
+ singleLevelOnly = true;
+ }
+
+ /**
+ * Creates a multi-level block index writer.
+ *
+ * @param blockWriter the block writer to use to write index blocks
+ * @param blockCache if this is not null, index blocks will be cached
+ * on write into this block cache.
+ */
+ public BlockIndexWriter(HFileBlock.Writer blockWriter,
+ BlockCache blockCache, String nameForCaching) {
+ if ((blockCache == null) != (nameForCaching == null)) {
+ throw new IllegalArgumentException("Block cache and file name for " +
+ "caching must be both specified or both null");
+ }
+
+ this.blockWriter = blockWriter;
+ this.blockCache = blockCache;
+ this.nameForCaching = nameForCaching;
+ this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
+ }
+
+ public void setMaxChunkSize(int maxChunkSize) {
+ if (maxChunkSize <= 0) {
+ throw new IllegalArgumentException("Invald maximum index block size");
+ }
+ this.maxChunkSize = maxChunkSize;
+ }
+
+ /**
+ * Writes the root level and intermediate levels of the block index into
+ * the output stream, generating the tree from bottom up. Assumes that the
+ * leaf level has been inline-written to the disk if there is enough data
+ * for more than one leaf block. We iterate by breaking the current level
+ * of the block index, starting with the index of all leaf-level blocks,
+ * into chunks small enough to be written to disk, and generate its parent
+ * level, until we end up with a level small enough to become the root
+ * level.
+ *
+ * If the leaf level is not large enough, there is no inline block index
+ * anymore, so we only write that level of block index to disk as the root
+ * level.
+ *
+ * @param out FSDataOutputStream
+ * @return position at which we entered the root-level index.
+ * @throws IOException
+ */
+ public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
+ if (curInlineChunk.getNumEntries() != 0) {
+ throw new IOException("Trying to write a multi-level block index, " +
+ "but are " + curInlineChunk.getNumEntries() + " entries in the " +
+ "last inline chunk.");
+ }
+
+ // We need to get mid-key metadata before we create intermediate
+ // indexes and overwrite the root chunk.
+ byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
+ : null;
+
+ while (rootChunk.getRootSize() > maxChunkSize) {
+ rootChunk = writeIntermediateLevel(out, rootChunk);
+ numLevels += 1;
+ }
+
+ // write the root level
+ long rootLevelIndexPos = out.getPos();
+
+ {
+ DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
+ false);
+ rootChunk.writeRoot(blockStream);
+ if (midKeyMetadata != null)
+ blockStream.write(midKeyMetadata);
+ blockWriter.writeHeaderAndData(out);
+ }
+
+ // Add root index block size
+ totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+ totalBlockUncompressedSize +=
+ blockWriter.getUncompressedSizeWithoutHeader();
+
+ LOG.info("Wrote a " + numLevels + "-level index with root level at pos "
+ + out.getPos() + ", " + rootChunk.getNumEntries()
+ + " root-level entries, " + totalNumEntries + " total entries, "
+ + totalBlockOnDiskSize + " bytes total on-disk size, "
+ + totalBlockUncompressedSize + " bytes total uncompressed size.");
+
+ return rootLevelIndexPos;
+ }
+
+ /**
+ * Writes the block index data as a single level only. Does not do any
+ * block framing.
+ *
+ * @param out the buffered output stream to write the index to. Typically a
+ * stream writing into an {@link HFile} block.
+ * @param description a short description of the index being written. Used
+ * in a log message.
+ * @throws IOException
+ */
+ public void writeSingleLevelIndex(DataOutput out, String description)
+ throws IOException {
+ expectNumLevels(1);
+
+ if (!singleLevelOnly)
+ throw new IOException("Single-level mode is turned off");
+
+ if (rootChunk.getNumEntries() > 0)
+ throw new IOException("Root-level entries already added in " +
+ "single-level mode");
+
+ rootChunk = curInlineChunk;
+ curInlineChunk = new BlockIndexChunk();
+
+ LOG.info("Wrote a single-level " + description + " index with "
+ + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
+ + " bytes");
+ rootChunk.writeRoot(out);
+ }
+
+ /**
+ * Split the current level of the block index into intermediate index
+ * blocks of permitted size and write those blocks to disk. Return the next
+ * level of the block index referencing those intermediate-level blocks.
+ *
+ * @param out
+ * @param currentLevel the current level of the block index, such as the a
+ * chunk referencing all leaf-level index blocks
+ * @return the parent level block index, which becomes the root index after
+ * a few (usually zero) iterations
+ * @throws IOException
+ */
+ private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
+ BlockIndexChunk currentLevel) throws IOException {
+ // Entries referencing intermediate-level blocks we are about to create.
+ BlockIndexChunk parent = new BlockIndexChunk();
+
+ // The current intermediate-level block index chunk.
+ BlockIndexChunk curChunk = new BlockIndexChunk();
+
+ for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
+ curChunk.add(currentLevel.getBlockKey(i),
+ currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
+
+ if (curChunk.getRootSize() >= maxChunkSize)
+ writeIntermediateBlock(out, parent, curChunk);
+ }
+
+ if (curChunk.getNumEntries() > 0) {
+ writeIntermediateBlock(out, parent, curChunk);
+ }
+
+ return parent;
+ }
+
+ private void writeIntermediateBlock(FSDataOutputStream out,
+ BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
+ long beginOffset = out.getPos();
+ DataOutputStream dos = blockWriter.startWriting(
+ BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+ curChunk.writeNonRoot(dos);
+ byte[] curFirstKey = curChunk.getBlockKey(0);
+ blockWriter.writeHeaderAndData(out);
+
+ if (blockCache != null) {
+ blockCache.cacheBlock(HFile.getBlockCacheKey(nameForCaching,
+ beginOffset), blockWriter.getBlockForCaching());
+ }
+
+ // Add intermediate index block size
+ totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+ totalBlockUncompressedSize +=
+ blockWriter.getUncompressedSizeWithoutHeader();
+
+ // OFFSET is the beginning offset the chunk of block index entries.
+ // SIZE is the total byte size of the chunk of block index entries
+ // + the secondary index size
+ // FIRST_KEY is the first key in the chunk of block index
+ // entries.
+ parent.add(curFirstKey, beginOffset,
+ blockWriter.getOnDiskSizeWithHeader());
+
+ // clear current block index chunk
+ curChunk.clear();
+ curFirstKey = null;
+ }
+
+ /**
+ * @return how many block index entries there are in the root level
+ */
+ public final int getNumRootEntries() {
+ return rootChunk.getNumEntries();
+ }
+
+ /**
+ * @return the number of levels in this block index.
+ */
+ public int getNumLevels() {
+ return numLevels;
+ }
+
+ private void expectNumLevels(int expectedNumLevels) {
+ if (numLevels != expectedNumLevels) {
+ throw new IllegalStateException("Number of block index levels is "
+ + numLevels + "but is expected to be " + expectedNumLevels);
+ }
+ }
+
+ /**
+ * Whether there is an inline block ready to be written. In general, we
+ * write an leaf-level index block as an inline block as soon as its size
+ * as serialized in the non-root format reaches a certain threshold.
+ */
+ @Override
+ public boolean shouldWriteBlock(boolean closing) {
+ if (singleLevelOnly)
+ throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+ if (curInlineChunk.getNumEntries() == 0)
+ return false;
+
+ // We do have some entries in the current inline chunk.
+ if (closing) {
+ if (rootChunk.getNumEntries() == 0) {
+ // We did not add any leaf-level blocks yet. Instead of creating a
+ // leaf level with one block, move these entries to the root level.
+
+ expectNumLevels(1);
+ rootChunk = curInlineChunk;
+ curInlineChunk = new BlockIndexChunk();
+ return false;
+ }
+
+ return true;
+ } else {
+ return curInlineChunk.getNonRootSize() >= maxChunkSize;
+ }
+ }
+
+ /**
+ * Write out the current inline index block. Inline blocks are non-root
+ * blocks, so the non-root index format is used.
+ *
+ * @param out
+ * @param position The beginning offset of the inline block in the file not
+ * include the header.
+ */
+ @Override
+ public void writeInlineBlock(DataOutput out) throws IOException {
+ if (singleLevelOnly)
+ throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+ // Write the inline block index to the output stream in the non-root
+ // index block format.
+ curInlineChunk.writeNonRoot(out);
+
+ // Save the first key of the inline block so that we can add it to the
+ // parent-level index.
+ firstKey = curInlineChunk.getBlockKey(0);
+
+ // Start a new inline index block
+ curInlineChunk.clear();
+ }
+
+ /**
+ * Called after an inline block has been written so that we can add an
+ * entry referring to that block to the parent-level index.
+ */
+ @Override
+ public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
+ {
+ // Add leaf index block size
+ totalBlockOnDiskSize += onDiskSize;
+ totalBlockUncompressedSize += uncompressedSize;
+
+ if (singleLevelOnly)
+ throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+ if (firstKey == null) {
+ throw new IllegalStateException("Trying to add second-level index " +
+ "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
+ "but the first key was not set in writeInlineBlock");
+ }
+
+ if (rootChunk.getNumEntries() == 0) {
+ // We are writing the first leaf block, so increase index level.
+ expectNumLevels(1);
+ numLevels = 2;
+ }
+
+ // Add another entry to the second-level index. Include the number of
+ // entries in all previous leaf-level chunks for mid-key calculation.
+ rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
+ firstKey = null;
+ }
+
+ @Override
+ public BlockType getInlineBlockType() {
+ return BlockType.LEAF_INDEX;
+ }
+
+ /**
+ * Add one index entry to the current leaf-level block. When the leaf-level
+ * block gets large enough, it will be flushed to disk as an inline block.
+ *
+ * @param firstKey the first key of the data block
+ * @param blockOffset the offset of the data block
+ * @param blockDataSize the on-disk size of the data block ({@link HFile}
+ * format version 2), or the uncompressed size of the data block (
+ * {@link HFile} format version 1).
+ */
+ public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
+ {
+ curInlineChunk.add(firstKey, blockOffset, blockDataSize);
+ ++totalNumEntries;
+ }
+
+ /**
+ * @throws IOException if we happened to write a multi-level index.
+ */
+ public void ensureSingleLevel() throws IOException {
+ if (numLevels > 1) {
+ throw new IOException ("Wrote a " + numLevels + "-level index with " +
+ rootChunk.getNumEntries() + " root-level entries, but " +
+ "this is expected to be a single-level block index.");
+ }
+ }
+
+ /**
+ * @return true if we are using cache-on-write. This is configured by the
+ * caller of the constructor by either passing a valid block cache
+ * or null.
+ */
+ @Override
+ public boolean cacheOnWrite() {
+ return blockCache != null;
+ }
+
+ /**
+ * The total uncompressed size of the root index block, intermediate-level
+ * index blocks, and leaf-level index blocks.
+ *
+ * @return the total uncompressed size of all index blocks
+ */
+ public long getTotalUncompressedSize() {
+ return totalBlockUncompressedSize;
+ }
+
+ }
+
+ /**
+ * A single chunk of the block index in the process of writing. The data in
+ * this chunk can become a leaf-level, intermediate-level, or root index
+ * block.
+ */
+ static class BlockIndexChunk {
+
+ /** First keys of the key range corresponding to each index entry. */
+ private final List<byte[]> blockKeys = new ArrayList<byte[]>();
+
+ /** Block offset in backing stream. */
+ private final List<Long> blockOffsets = new ArrayList<Long>();
+
+ /** On-disk data sizes of lower-level data or index blocks. */
+ private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
+
+ /**
+ * The cumulative number of sub-entries, i.e. entries on deeper-level block
+ * index entries. numSubEntriesAt[i] is the number of sub-entries in the
+ * blocks corresponding to this chunk's entries #0 through #i inclusively.
+ */
+ private final List<Long> numSubEntriesAt = new ArrayList<Long>();
+
+ /**
+ * The offset of the next entry to be added, relative to the end of the
+ * "secondary index" in the "non-root" format representation of this index
+ * chunk. This is the next value to be added to the secondary index.
+ */
+ private int curTotalNonRootEntrySize = 0;
+
+ /**
+ * The accumulated size of this chunk if stored in the root index format.
+ */
+ private int curTotalRootSize = 0;
+
+ /**
+ * The "secondary index" used for binary search over variable-length
+ * records in a "non-root" format block. These offsets are relative to the
+ * end of this secondary index.
+ */
+ private final List<Integer> secondaryIndexOffsetMarks =
+ new ArrayList<Integer>();
+
+ /**
+ * Adds a new entry to this block index chunk.
+ *
+ * @param firstKey the first key in the block pointed to by this entry
+ * @param blockOffset the offset of the next-level block pointed to by this
+ * entry
+ * @param onDiskDataSize the on-disk data of the block pointed to by this
+ * entry, including header size
+ * @param curTotalNumSubEntries if this chunk is the root index chunk under
+ * construction, this specifies the current total number of
+ * sub-entries in all leaf-level chunks, including the one
+ * corresponding to the second-level entry being added.
+ */
+ void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
+ long curTotalNumSubEntries) {
+ // Record the offset for the secondary index
+ secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
+ curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+ + firstKey.length;
+
+ curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+ + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
+
+ blockKeys.add(firstKey);
+ blockOffsets.add(blockOffset);
+ onDiskDataSizes.add(onDiskDataSize);
+
+ if (curTotalNumSubEntries != -1) {
+ numSubEntriesAt.add(curTotalNumSubEntries);
+
+ // Make sure the parallel arrays are in sync.
+ if (numSubEntriesAt.size() != blockKeys.size()) {
+ throw new IllegalStateException("Only have key/value count " +
+ "stats for " + numSubEntriesAt.size() + " block index " +
+ "entries out of " + blockKeys.size());
+ }
+ }
+ }
+
+ /**
+ * The same as {@link #add(byte[], long, int, long)} but does not take the
+ * key/value into account. Used for single-level indexes.
+ *
+ * @see {@link #add(byte[], long, int, long)}
+ */
+ public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
+ add(firstKey, blockOffset, onDiskDataSize, -1);
+ }
+
+ public void clear() {
+ blockKeys.clear();
+ blockOffsets.clear();
+ onDiskDataSizes.clear();
+ secondaryIndexOffsetMarks.clear();
+ numSubEntriesAt.clear();
+ curTotalNonRootEntrySize = 0;
+ curTotalRootSize = 0;
+ }
+
+ /**
+ * Finds the entry corresponding to the deeper-level index block containing
+ * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
+ * ordering of sub-entries.
+ *
+ * <p>
+ * <i> Implementation note. </i> We are looking for i such that
+ * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
+ * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
+ * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
+ * sub-entries. i is by definition the insertion point of k in
+ * numSubEntriesAt.
+ *
+ * @param k sub-entry index, from 0 to the total number sub-entries - 1
+ * @return the 0-based index of the entry corresponding to the given
+ * sub-entry
+ */
+ public int getEntryBySubEntry(long k) {
+ // We define mid-key as the key corresponding to k'th sub-entry
+ // (0-based).
+
+ int i = Collections.binarySearch(numSubEntriesAt, k);
+
+ // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
+ // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
+ // is in the (i + 1)'th chunk.
+ if (i >= 0)
+ return i + 1;
+
+ // Inexact match. Return the insertion point.
+ return -i - 1;
+ }
+
+ /**
+ * Used when writing the root block index of a multi-level block index.
+ * Serializes additional information allowing to efficiently identify the
+ * mid-key.
+ *
+ * @return a few serialized fields for finding the mid-key
+ * @throws IOException if could not create metadata for computing mid-key
+ */
+ public byte[] getMidKeyMetadata() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(
+ MID_KEY_METADATA_SIZE);
+ DataOutputStream baosDos = new DataOutputStream(baos);
+ long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
+ if (totalNumSubEntries == 0) {
+ throw new IOException("No leaf-level entries, mid-key unavailable");
+ }
+ long midKeySubEntry = (totalNumSubEntries - 1) / 2;
+ int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
+
+ baosDos.writeLong(blockOffsets.get(midKeyEntry));
+ baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
+
+ long numSubEntriesBefore = midKeyEntry > 0
+ ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
+ long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
+ if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
+ {
+ throw new IOException("Could not identify mid-key index within the "
+ + "leaf-level block containing mid-key: out of range ("
+ + subEntryWithinEntry + ", numSubEntriesBefore="
+ + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
+ + ")");
+ }
+
+ baosDos.writeInt((int) subEntryWithinEntry);
+
+ if (baosDos.size() != MID_KEY_METADATA_SIZE) {
+ throw new IOException("Could not write mid-key metadata: size=" +
+ baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
+ }
+
+ // Close just to be good citizens, although this has no effect.
+ baos.close();
+
+ return baos.toByteArray();
+ }
+
+ /**
+ * Writes the block index chunk in the non-root index block format. This
+ * format contains the number of entries, an index of integer offsets
+ * for quick binary search on variable-length records, and tuples of
+ * block offset, on-disk block size, and the first key for each entry.
+ *
+ * @param out
+ * @throws IOException
+ */
+ void writeNonRoot(DataOutput out) throws IOException {
+ // The number of entries in the block.
+ out.writeInt(blockKeys.size());
+
+ if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
+ throw new IOException("Corrupted block index chunk writer: " +
+ blockKeys.size() + " entries but " +
+ secondaryIndexOffsetMarks.size() + " secondary index items");
+ }
+
+ // For each entry, write a "secondary index" of relative offsets to the
+ // entries from the end of the secondary index. This works, because at
+ // read time we read the number of entries and know where the secondary
+ // index ends.
+ for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
+ out.writeInt(currentSecondaryIndex);
+
+ // We include one other element in the secondary index to calculate the
+ // size of each entry more easily by subtracting secondary index elements.
+ out.writeInt(curTotalNonRootEntrySize);
+
+ for (int i = 0; i < blockKeys.size(); ++i) {
+ out.writeLong(blockOffsets.get(i));
+ out.writeInt(onDiskDataSizes.get(i));
+ out.write(blockKeys.get(i));
+ }
+ }
+
+ /**
+ * @return the size of this chunk if stored in the non-root index block
+ * format
+ */
+ int getNonRootSize() {
+ return Bytes.SIZEOF_INT // Number of entries
+ + Bytes.SIZEOF_INT * (blockKeys.size() + 1) // Secondary index
+ + curTotalNonRootEntrySize; // All entries
+ }
+
+ /**
+ * Writes this chunk into the given output stream in the root block index
+ * format. This format is similar to the {@link HFile} version 1 block
+ * index format, except that we store on-disk size of the block instead of
+ * its uncompressed size.
+ *
+ * @param out the data output stream to write the block index to. Typically
+ * a stream writing into an {@link HFile} block.
+ * @throws IOException
+ */
+ void writeRoot(DataOutput out) throws IOException {
+ for (int i = 0; i < blockKeys.size(); ++i) {
+ out.writeLong(blockOffsets.get(i));
+ out.writeInt(onDiskDataSizes.get(i));
+ Bytes.writeByteArray(out, blockKeys.get(i));
+ }
+ }
+
+ /**
+ * @return the size of this chunk if stored in the root index block format
+ */
+ int getRootSize() {
+ return curTotalRootSize;
+ }
+
+ /**
+ * @return the number of entries in this block index chunk
+ */
+ public int getNumEntries() {
+ return blockKeys.size();
+ }
+
+ public byte[] getBlockKey(int i) {
+ return blockKeys.get(i);
+ }
+
+ public long getBlockOffset(int i) {
+ return blockOffsets.get(i);
+ }
+
+ public int getOnDiskDataSize(int i) {
+ return onDiskDataSizes.get(i);
+ }
+
+ public long getCumulativeNumKV(int i) {
+ if (i < 0)
+ return 0;
+ return numSubEntriesAt.get(i);
+ }
+
+ }
+
+ /**
+ * @return true if the given configuration specifies that we should
+ * cache-on-write index blocks
+ */
+ public static boolean shouldCacheOnWrite(Configuration conf) {
+ return conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
+ }
+
+ public static int getMaxChunkSize(Configuration conf) {
+ return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
+ }
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,308 @@
+
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Implements pretty-printing functionality for {@link HFile}s.
+ */
+public class HFilePrettyPrinter {
+
+ private static final Log LOG = LogFactory.getLog(HFilePrettyPrinter.class);
+
+ private Options options = new Options();
+
+ private boolean verbose;
+ private boolean printValue;
+ private boolean printKey;
+ private boolean shouldPrintMeta;
+ private boolean printBlocks;
+ private boolean checkRow;
+ private boolean checkFamily;
+
+ private Configuration conf;
+
+ private List<Path> files = new ArrayList<Path>();
+ private int count;
+
+ private static final String FOUR_SPACES = " ";
+
+ public HFilePrettyPrinter() {
+ options.addOption("v", "verbose", false,
+ "Verbose output; emits file and meta data delimiters");
+ options.addOption("p", "printkv", false, "Print key/value pairs");
+ options.addOption("e", "printkey", false, "Print keys");
+ options.addOption("m", "printmeta", false, "Print meta data of file");
+ options.addOption("b", "printblocks", false, "Print block index meta data");
+ options.addOption("k", "checkrow", false,
+ "Enable row order check; looks for out-of-order keys");
+ options.addOption("a", "checkfamily", false, "Enable family check");
+ options.addOption("f", "file", true,
+ "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
+ options.addOption("r", "region", true,
+ "Region to scan. Pass region name; e.g. '.META.,,1'");
+ }
+
+ public boolean parseOptions(String args[]) throws ParseException,
+ IOException {
+ if (args.length == 0) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("HFile", options, true);
+ return false;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ verbose = cmd.hasOption("v");
+ printValue = cmd.hasOption("p");
+ printKey = cmd.hasOption("e") || printValue;
+ shouldPrintMeta = cmd.hasOption("m");
+ printBlocks = cmd.hasOption("b");
+ checkRow = cmd.hasOption("k");
+ checkFamily = cmd.hasOption("a");
+
+ if (cmd.hasOption("f")) {
+ files.add(new Path(cmd.getOptionValue("f")));
+ }
+
+ if (cmd.hasOption("r")) {
+ String regionName = cmd.getOptionValue("r");
+ byte[] rn = Bytes.toBytes(regionName);
+ byte[][] hri = HRegionInfo.parseRegionName(rn);
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
+ String enc = HRegionInfo.encodeRegionName(rn);
+ Path regionDir = new Path(tableDir, enc);
+ if (verbose)
+ System.out.println("region dir -> " + regionDir);
+ List<Path> regionFiles = HFile.getStoreFiles(FileSystem.get(conf),
+ regionDir);
+ if (verbose)
+ System.out.println("Number of region files found -> "
+ + regionFiles.size());
+ if (verbose) {
+ int i = 1;
+ for (Path p : regionFiles) {
+ if (verbose)
+ System.out.println("Found file[" + i++ + "] -> " + p);
+ }
+ }
+ files.addAll(regionFiles);
+ }
+
+ return true;
+ }
+
+ /**
+ * Runs the command-line pretty-printer, and returns the desired command
+ * exit code (zero for success, non-zero for failure).
+ */
+ public int run(String[] args) {
+ conf = HBaseConfiguration.create();
+ conf.set("fs.defaultFS",
+ conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+ conf.set("fs.default.name",
+ conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+ try {
+ if (!parseOptions(args))
+ return 1;
+ } catch (IOException ex) {
+ LOG.error("Error parsing command-line options", ex);
+ return 1;
+ } catch (ParseException ex) {
+ LOG.error("Error parsing command-line options", ex);
+ return 1;
+ }
+
+ // iterate over all files found
+ for (Path fileName : files) {
+ try {
+ processFile(fileName);
+ } catch (IOException ex) {
+ LOG.error("Error reading " + fileName, ex);
+ }
+ }
+
+ if (verbose || printKey) {
+ System.out.println("Scanned kv count -> " + count);
+ }
+
+ return 0;
+ }
+
+ private void processFile(Path file) throws IOException {
+ if (verbose)
+ System.out.println("Scanning -> " + file);
+ FileSystem fs = file.getFileSystem(conf);
+ if (!fs.exists(file)) {
+ System.err.println("ERROR, file doesnt exist: " + file);
+ }
+
+ HFile.Reader reader = HFile.createReader(fs, file, null, false, false);
+
+ Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+
+ if (verbose || printKey || checkRow || checkFamily) {
+
+ // scan over file and read key/value's and check if requested
+ HFileScanner scanner = reader.getScanner(false, false, false);
+ scanner.seekTo();
+ scanKeysValues(file, count, scanner);
+ }
+
+ // print meta data
+ if (shouldPrintMeta) {
+ printMeta(reader, fileInfo);
+ }
+
+ if (printBlocks) {
+ System.out.println("Block Index:");
+ System.out.println(reader.getDataBlockIndexReader());
+ }
+
+ reader.close();
+ }
+
+ private void scanKeysValues(Path file, int count, HFileScanner scanner)
+ throws IOException {
+ KeyValue pkv = null;
+ do {
+ KeyValue kv = scanner.getKeyValue();
+ // dump key value
+ if (printKey) {
+ System.out.print("K: " + kv);
+ if (printValue) {
+ System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
+ }
+ System.out.println();
+ }
+ // check if rows are in order
+ if (checkRow && pkv != null) {
+ if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
+ System.err.println("WARNING, previous row is greater then"
+ + " current row\n\tfilename -> " + file + "\n\tprevious -> "
+ + Bytes.toStringBinary(pkv.getKey()) + "\n\tcurrent -> "
+ + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ // check if families are consistent
+ if (checkFamily) {
+ String fam = Bytes.toString(kv.getFamily());
+ if (!file.toString().contains(fam)) {
+ System.err.println("WARNING, filename does not match kv family,"
+ + "\n\tfilename -> " + file + "\n\tkeyvalue -> "
+ + Bytes.toStringBinary(kv.getKey()));
+ }
+ if (pkv != null
+ && !Bytes.equals(pkv.getFamily(), kv.getFamily())) {
+ System.err.println("WARNING, previous kv has different family"
+ + " compared to current key\n\tfilename -> " + file
+ + "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey())
+ + "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ pkv = kv;
+ ++count;
+ } while (scanner.next());
+ }
+
+ /**
+ * Format a string of the form "k1=v1, k2=v2, ..." into separate lines
+ * with a four-space indentation.
+ */
+ private static String asSeparateLines(String keyValueStr) {
+ return keyValueStr.replaceAll(", ([a-zA-Z]+=)",
+ ",\n" + FOUR_SPACES + "$1");
+ }
+
+ private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo)
+ throws IOException {
+ System.out.println("Block index size as per heapsize: "
+ + reader.indexSize());
+ System.out.println(asSeparateLines(reader.toString()));
+ System.out.println("Trailer:\n "
+ + asSeparateLines(reader.getTrailer().toString()));
+ System.out.println("Fileinfo:");
+ for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
+ System.out.print(FOUR_SPACES + Bytes.toString(e.getKey()) + " = ");
+ if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY")) == 0) {
+ long seqid = Bytes.toLong(e.getValue());
+ System.out.println(seqid);
+ } else if (Bytes.compareTo(e.getKey(), Bytes.toBytes("TIMERANGE")) == 0) {
+ TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+ Writables.copyWritable(e.getValue(), timeRangeTracker);
+ System.out.println(timeRangeTracker.getMinimumTimestamp() + "...."
+ + timeRangeTracker.getMaximumTimestamp());
+ } else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0
+ || Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
+ System.out.println(Bytes.toInt(e.getValue()));
+ } else {
+ System.out.println(Bytes.toStringBinary(e.getValue()));
+ }
+ }
+
+ System.out.println("Mid-key: " + Bytes.toStringBinary(reader.midkey()));
+
+ // Printing bloom information
+ DataInput bloomMeta = reader.getBloomFilterMetadata();
+ BloomFilter bloomFilter = null;
+ if (bloomMeta != null)
+ bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
+
+ System.out.println("Bloom filter:");
+ if (bloomFilter != null) {
+ System.out.println(FOUR_SPACES + bloomFilter.toString().replaceAll(
+ ByteBloomFilter.STATS_RECORD_SEP, "\n" + FOUR_SPACES));
+ } else {
+ System.out.println(FOUR_SPACES + "Not present");
+ }
+ }
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A way to write "inline" blocks into an {@link HFile}. Inline blocks are
+ * interspersed with data blocks. For example, Bloom filter chunks and
+ * leaf-level blocks of a multi-level block index are stored as inline blocks.
+ */
+public interface InlineBlockWriter {
+
+ /**
+ * Determines whether there is a new block to be written out.
+ *
+ * @param closing
+ * whether the file is being closed, in which case we need to write
+ * out all available data and not wait to accumulate another block
+ */
+ boolean shouldWriteBlock(boolean closing);
+
+ /**
+ * Writes the block to the provided stream. Must not write any magic records.
+ * Called only if {@link #shouldWriteBlock(boolean)} returned true.
+ *
+ * @param out
+ * a stream (usually a compressing stream) to write the block to
+ */
+ void writeInlineBlock(DataOutput out) throws IOException;
+
+ /**
+ * Called after a block has been written, and its offset, raw size, and
+ * compressed size have been determined. Can be used to add an entry to a
+ * block index. If this type of inline blocks needs a block index, the inline
+ * block writer is responsible for maintaining it.
+ *
+ * @param offset the offset of the block in the stream
+ * @param onDiskSize the on-disk size of the block
+ * @param uncompressedSize the uncompressed size of the block
+ * @param rawSize
+ */
+ void blockWritten(long offset, int onDiskSize, int uncompressedSize);
+
+ /**
+ * The type of blocks this block writer produces.
+ */
+ BlockType getInlineBlockType();
+
+ /**
+ * @return true if inline blocks produced by this writer should be cached
+ */
+ boolean cacheOnWrite();
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Common methods Bloom filter methods required at read and write time.
+ */
+public interface BloomFilterBase {
+
+ /**
+ * @return The number of keys added to the bloom
+ */
+ long getKeyCount();
+
+ /**
+ * @return The max number of keys that can be inserted
+ * to maintain the desired error rate
+ */
+ long getMaxKeys();
+
+ /**
+ * @return Size of the bloom, in bytes
+ */
+ long getByteSize();
+
+ /**
+ * Create a key for a row-column Bloom filter.
+ */
+ byte[] createBloomKey(byte[] rowBuf, int rowOffset, int rowLen,
+ byte[] qualBuf, int qualOffset, int qualLen);
+
+ /**
+ * @return Bloom key comparator
+ */
+ RawComparator<byte[]> getComparator();
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+
+/**
+ * Handles Bloom filter initialization based on configuration and serialized
+ * metadata in the reader and writer of {@link StoreFile}.
+ */
+public final class BloomFilterFactory {
+
+ private static final Log LOG =
+ LogFactory.getLog(BloomFilterFactory.class.getName());
+
+ /** This class should not be instantiated. */
+ private BloomFilterFactory() {}
+
+ /**
+ * Specifies the target error rate to use when selecting the number of keys
+ * per Bloom filter.
+ */
+ public static final String IO_STOREFILE_BLOOM_ERROR_RATE =
+ "io.storefile.bloom.error.rate";
+
+ /**
+ * Maximum folding factor allowed. The Bloom filter will be shrunk by
+ * the factor of up to 2 ** this times if we oversize it initially.
+ */
+ public static final String IO_STOREFILE_BLOOM_MAX_FOLD =
+ "io.storefile.bloom.max.fold";
+
+ /**
+ * For default (single-block) Bloom filters this specifies the maximum number
+ * of keys.
+ */
+ public static final String IO_STOREFILE_BLOOM_MAX_KEYS =
+ "io.storefile.bloom.max.keys";
+
+ /** Master switch to enable Bloom filters */
+ public static final String IO_STOREFILE_BLOOM_ENABLED =
+ "io.storefile.bloom.enabled";
+
+ /**
+ * Target Bloom block size. Bloom filter blocks of approximately this size
+ * are interleaved with data blocks.
+ */
+ public static final String IO_STOREFILE_BLOOM_BLOCK_SIZE =
+ "io.storefile.bloom.block.size";
+
+ /** Whether to cache compound Bloom filter blocks on write */
+ public static final String IO_STOREFILE_BLOOM_CACHE_ON_WRITE =
+ "io.storefile.bloom.cacheonwrite";
+
+ /** Maximum number of times a Bloom filter can be "folded" if oversized */
+ private static final int MAX_ALLOWED_FOLD_FACTOR = 7;
+
+ /**
+ * Instantiates the correct Bloom filter class based on the version provided
+ * in the meta block data.
+ *
+ * @param meta the byte array holding the Bloom filter's metadata, including
+ * version information
+ * @param reader the {@link HFile} reader to use to lazily load Bloom filter
+ * blocks
+ * @return an instance of the correct type of Bloom filter
+ * @throws IllegalArgumentException
+ */
+ public static BloomFilter
+ createFromMeta(DataInput meta, HFile.Reader reader)
+ throws IllegalArgumentException, IOException {
+ int version = meta.readInt();
+ switch (version) {
+ case ByteBloomFilter.VERSION:
+ // This is only possible in a version 1 HFile. We are ignoring the
+ // passed comparator because raw byte comparators are always used
+ // in version 1 Bloom filters.
+ return new ByteBloomFilter(meta);
+
+ case CompoundBloomFilterBase.VERSION:
+ return new CompoundBloomFilter(meta, reader);
+
+ default:
+ throw new IllegalArgumentException(
+ "Bad bloom filter format version " + version
+ );
+ }
+ }
+
+ /**
+ * @return true if Bloom filters are enabled in the given configuration
+ */
+ public static boolean isBloomEnabled(Configuration conf) {
+ return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true);
+ }
+
+ public static float getErrorRate(Configuration conf) {
+ return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
+ }
+
+ /**
+ * Creates a new Bloom filter at the time of
+ * {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing.
+ *
+ * @param conf
+ * @param bloomType
+ * @param maxKeys an estimate of the number of keys we expect to insert.
+ * Irrelevant if compound Bloom filters are enabled.
+ * @param writer the HFile writer
+ * @param comparator the comparator to use for compound Bloom filters. This
+ * has no effect if creating single-chunk version 1 Bloom filters.
+ * @return the new Bloom filter, or null in case Bloom filters are disabled
+ * or when failed to create one.
+ */
+ public static BloomFilterWriter createBloomAtWrite(Configuration conf,
+ BloomType bloomType, int maxKeys, HFile.Writer writer) {
+ if (!isBloomEnabled(conf)) {
+ LOG.info("Bloom filters are disabled by configuration for "
+ + writer.getPath()
+ + (conf == null ? " (configuration is null)" : ""));
+ return null;
+ } else if (bloomType == BloomType.NONE) {
+ LOG.info("Bloom filter is turned off for the column family");
+ return null;
+ }
+
+ float err = getErrorRate(conf);
+
+ // In case of row/column Bloom filter lookups, each lookup is an OR if two
+ // separate lookups. Therefore, if each lookup's false positive rate is p,
+ // the resulting false positive rate is err = 1 - (1 - p)^2, and
+ // p = 1 - sqrt(1 - err).
+ if (bloomType == BloomType.ROWCOL) {
+ err = (float) (1 - Math.sqrt(1 - err));
+ }
+
+ int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD,
+ MAX_ALLOWED_FOLD_FACTOR);
+
+ if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) {
+ // In case of compound Bloom filters we ignore the maxKeys hint.
+ CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(
+ getBloomBlockSize(conf), err, Hash.getHashType(conf), maxFold,
+ cacheChunksOnWrite(conf), bloomType == BloomType.ROWCOL
+ ? KeyValue.KEY_COMPARATOR : Bytes.BYTES_RAWCOMPARATOR);
+ writer.addInlineBlockWriter(bloomWriter);
+ return bloomWriter;
+ } else {
+ // A single-block Bloom filter. Only used when testing HFile format
+ // version 1.
+ int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS,
+ 128 * 1000 * 1000);
+
+ if (maxKeys <= 0) {
+ LOG.warn("Invalid maximum number of keys specified: " + maxKeys
+ + ", not using Bloom filter");
+ return null;
+ } else if (maxKeys < tooBig) {
+ BloomFilterWriter bloom = new ByteBloomFilter((int) maxKeys, err,
+ Hash.getHashType(conf), maxFold);
+ bloom.allocBloom();
+ return bloom;
+ } else {
+ LOG.debug("Skipping bloom filter because max keysize too large: "
+ + maxKeys);
+ }
+ }
+ return null;
+ }
+
+ /** @return the compound Bloom filter block size from the configuration */
+ public static int getBloomBlockSize(Configuration conf) {
+ return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
+ }
+
+ /** @return whether to cache compound Bloom filter chunks on write */
+ public static boolean cacheChunksOnWrite(Configuration conf) {
+ return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
+ }
+
+};
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * A Bloom filter implementation built on top of {@link ByteBloomFilter},
+ * encapsulating a set of fixed-size Bloom filters written out at the time of
+ * {@link org.apache.hadoop.hbase.io.hfile.HFile} generation into the data
+ * block stream, and loaded on demand at query time. This class only provides
+ * reading capabilities.
+ */
+public class CompoundBloomFilter extends CompoundBloomFilterBase
+ implements BloomFilter {
+
+ /** Used to load chunks on demand */
+ private HFile.Reader reader;
+
+ private HFileBlockIndex.BlockIndexReader index;
+
+ private int hashCount;
+ private Hash hash;
+
+ private long[] numQueriesPerChunk;
+ private long[] numPositivesPerChunk;
+
+ /**
+ * De-serialization for compound Bloom filter metadata. Must be consistent
+ * with what {@link CompoundBloomFilterWriter} does.
+ *
+ * @param meta serialized Bloom filter metadata without any magic blocks
+ * @throws IOException
+ */
+ public CompoundBloomFilter(DataInput meta, HFile.Reader reader)
+ throws IOException {
+ this.reader = reader;
+
+ totalByteSize = meta.readLong();
+ hashCount = meta.readInt();
+ hashType = meta.readInt();
+ totalKeyCount = meta.readLong();
+ totalMaxKeys = meta.readLong();
+ numChunks = meta.readInt();
+ comparator = FixedFileTrailer.createComparator(
+ Bytes.toString(Bytes.readByteArray(meta)));
+
+ hash = Hash.getInstance(hashType);
+ if (hash == null) {
+ throw new IllegalArgumentException("Invalid hash type: " + hashType);
+ }
+
+ index = new HFileBlockIndex.BlockIndexReader(comparator, 1);
+ index.readRootIndex(meta, numChunks);
+ }
+
+ @Override
+ public boolean contains(byte[] key, int keyOffset, int keyLength,
+ ByteBuffer bloom) {
+ // We try to store the result in this variable so we can update stats for
+ // testing, but when an error happens, we log a message and return.
+ boolean result;
+
+ int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
+ if (block < 0) {
+ result = false; // This key is not in the file.
+ } else {
+ HFileBlock bloomBlock;
+ try {
+ // We cache the block and use a positional read.
+ bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
+ index.getRootBlockDataSize(block), true, true, false);
+ } catch (IOException ex) {
+ // The Bloom filter is broken, turn it off.
+ throw new IllegalArgumentException(
+ "Failed to load Bloom block for key "
+ + Bytes.toStringBinary(key, keyOffset, keyLength), ex);
+ }
+
+ ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
+ result = ByteBloomFilter.contains(key, keyOffset, keyLength,
+ bloomBuf.array(), bloomBuf.arrayOffset() + HFileBlock.HEADER_SIZE,
+ bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
+ }
+
+ if (numQueriesPerChunk != null && block >= 0) {
+ // Update statistics. Only used in unit tests.
+ ++numQueriesPerChunk[block];
+ if (result)
+ ++numPositivesPerChunk[block];
+ }
+
+ return result;
+ }
+
+ public boolean supportsAutoLoading() {
+ return true;
+ }
+
+ public int getNumChunks() {
+ return numChunks;
+ }
+
+ @Override
+ public RawComparator<byte[]> getComparator() {
+ return comparator;
+ }
+
+ public void enableTestingStats() {
+ numQueriesPerChunk = new long[numChunks];
+ numPositivesPerChunk = new long[numChunks];
+ }
+
+ public String formatTestingStats() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numChunks; ++i) {
+ sb.append("chunk #");
+ sb.append(i);
+ sb.append(": queries=");
+ sb.append(numQueriesPerChunk[i]);
+ sb.append(", positives=");
+ sb.append(numPositivesPerChunk[i]);
+ sb.append(", positiveRatio=");
+ sb.append(numPositivesPerChunk[i] * 1.0 / numQueriesPerChunk[i]);
+ sb.append(";\n");
+ }
+ return sb.toString();
+ }
+
+ public long getNumQueriesForTesting(int chunk) {
+ return numQueriesPerChunk[chunk];
+ }
+
+ public long getNumPositivesForTesting(int chunk) {
+ return numPositivesPerChunk[chunk];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ByteBloomFilter.formatStats(this));
+ sb.append(ByteBloomFilter.STATS_RECORD_SEP +
+ "Number of chunks: " + numChunks);
+ sb.append(ByteBloomFilter.STATS_RECORD_SEP +
+ "Comparator: " + comparator.getClass().getSimpleName());
+ return sb.toString();
+ }
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java Wed Aug 3 20:25:28 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.RawComparator;
+
+public class CompoundBloomFilterBase implements BloomFilterBase {
+
+ /**
+ * At read time, the total number of chunks. At write time, the number of
+ * chunks created so far. The first chunk has an ID of 0, and the current
+ * chunk has the ID of numChunks - 1.
+ */
+ protected int numChunks;
+
+ /**
+ * The Bloom filter version. There used to be a DynamicByteBloomFilter which
+ * had version 2.
+ */
+ public static final int VERSION = 3;
+
+ /** Target error rate for configuring the filter and for information */
+ protected float errorRate;
+
+ /** The total number of keys in all chunks */
+ protected long totalKeyCount;
+ protected long totalByteSize;
+ protected long totalMaxKeys;
+
+ /** Hash function type to use, as defined in {@link Hash} */
+ protected int hashType;
+
+ /** Comparator used to compare Bloom filter keys */
+ protected RawComparator<byte[]> comparator;
+
+ @Override
+ public long getMaxKeys() {
+ return totalMaxKeys;
+ }
+
+ @Override
+ public long getKeyCount() {
+ return totalKeyCount;
+ }
+
+ @Override
+ public long getByteSize() {
+ return totalByteSize;
+ }
+
+ private static final byte[] DUMMY = new byte[0];
+
+ /**
+ * Prepare an ordered pair of row and qualifier to be compared using
+ * {@link KeyValue.KeyComparator}. This is only used for row-column Bloom
+ * filters.
+ */
+ @Override
+ public byte[] createBloomKey(byte[] row, int roffset, int rlength,
+ byte[] qualifier, int qoffset, int qlength) {
+ if (qualifier == null)
+ qualifier = DUMMY;
+
+ // Make sure this does not specify a timestamp so that the default maximum
+ // (most recent) timestamp is used.
+ KeyValue kv = KeyValue.createFirstOnRow(row, roffset, rlength, DUMMY, 0, 0,
+ qualifier, qoffset, qlength);
+ return kv.getKey();
+ }
+
+ @Override
+ public RawComparator<byte[]> getComparator() {
+ return comparator;
+ }
+
+}