You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:19:47 UTC

svn commit: r1181556 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apa...

Author: nspiegelberg
Date: Tue Oct 11 02:19:47 2011
New Revision: 1181556

URL: http://svn.apache.org/viewvc?rev=1181556&view=rev
Log:
Multi-level block index

Summary:
This is a version of Liyin's original implementation of multi-level block
indexes with a significant amount of refactoring, documentation, and bug fixes
to the point that it can run smoothly in HBaseTest for at least a few days. I
also added a unit test, which currently only tests one- and two-level block
indexes. There is a discussion of multi-level block indexes at
https://issues.apache.org/jira/browse/HBASE-3857, and the relevant JIRA these
changes will be part of is https://issues.apache.org/jira/browse/HBASE-3856.

Test Plan:
Unit tests. Load test (HBaseTest). Dark launch.

Reviewed By: kannan
Reviewers: liyintang, kranganathan, kannan, nspiegelberg, gqchen, aaiyer
Commenters: jgray, liyintang
CC: hbase@lists, , mbautin, jgray, kenny, liyintang, kannan
Revert Plan:
The new block index implementation is capable of reading version 1 HFile block
indexes and does that as part of HFile reader v1 in D251875. Howver, multi-level
block indexes introduce non-reverse-compatible changes to the block index
format.

Differential Revision: 251949

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerLoad.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerLoad.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerLoad.java?rev=1181556&r1=1181555&r2=1181556&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerLoad.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerLoad.java Tue Oct 11 02:19:47 2011
@@ -63,9 +63,27 @@ public class HServerLoad implements Writ
     private int storefileSizeMB;
     /** the current size of the memstore for the region, in MB */
     private int memstoreSizeMB;
-    /** the current total size of storefile indexes for the region, in MB */
+
+    /**
+     * The current total size of root-level store file indexes for the region,
+     * in MB. The same as {@link #rootIndexSizeKB} but in MB.
+     */
     private int storefileIndexSizeMB;
 
+    /** The current total size of root-level indexes for the region, in KB */
+    private int rootIndexSizeKB;
+
+    /**
+     * The total size of all index blocks, not just the root level, in KB.
+     */
+    private int totalStaticIndexSizeKB;
+
+    /**
+     * The total size of all Bloom filter blocks, not just loaded into the
+     * block cache, in KB.
+     */
+    private int totalStaticBloomSizeKB;
+
     /**
      * Constructor, for Writable
      */
@@ -83,13 +101,18 @@ public class HServerLoad implements Writ
      */
     public RegionLoad(final byte[] name, final int stores,
         final int storefiles, final int storefileSizeMB,
-        final int memstoreSizeMB, final int storefileIndexSizeMB) {
+        final int memstoreSizeMB, final int storefileIndexSizeMB,
+        final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
+        final int totalStaticBloomSizeKB) {
       this.name = name;
       this.stores = stores;
       this.storefiles = storefiles;
       this.storefileSizeMB = storefileSizeMB;
       this.memstoreSizeMB = memstoreSizeMB;
       this.storefileIndexSizeMB = storefileIndexSizeMB;
+      this.rootIndexSizeKB = rootIndexSizeKB;
+      this.totalStaticIndexSizeKB = totalStaticIndexSizeKB;
+      this.totalStaticBloomSizeKB = totalStaticBloomSizeKB;
     }
 
     // Getters
@@ -191,6 +214,9 @@ public class HServerLoad implements Writ
       this.storefileSizeMB = in.readInt();
       this.memstoreSizeMB = in.readInt();
       this.storefileIndexSizeMB = in.readInt();
+      this.rootIndexSizeKB = in.readInt();
+      this.totalStaticIndexSizeKB = in.readInt();
+      this.totalStaticBloomSizeKB = in.readInt();
     }
 
     public void write(DataOutput out) throws IOException {
@@ -201,6 +227,9 @@ public class HServerLoad implements Writ
       out.writeInt(storefileSizeMB);
       out.writeInt(memstoreSizeMB);
       out.writeInt(storefileIndexSizeMB);
+      out.writeInt(rootIndexSizeKB);
+      out.writeInt(totalStaticIndexSizeKB);
+      out.writeInt(totalStaticBloomSizeKB);
     }
 
     /**
@@ -218,6 +247,12 @@ public class HServerLoad implements Writ
         Integer.valueOf(this.memstoreSizeMB));
       sb = Strings.appendKeyValue(sb, "storefileIndexSizeMB",
         Integer.valueOf(this.storefileIndexSizeMB));
+      sb = Strings.appendKeyValue(sb, "rootIndexSizeKB",
+          Integer.valueOf(this.rootIndexSizeKB));
+      sb = Strings.appendKeyValue(sb, "totalStaticIndexSizeKB",
+          Integer.valueOf(this.totalStaticIndexSizeKB));
+      sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
+          Integer.valueOf(this.totalStaticBloomSizeKB));
       return sb.toString();
     }
   }
@@ -457,9 +492,12 @@ public class HServerLoad implements Writ
   @Deprecated
   public void addRegionInfo(final byte[] name, final int stores,
       final int storefiles, final int storefileSizeMB,
-      final int memstoreSizeMB, final int storefileIndexSizeMB) {
+      final int memstoreSizeMB, final int storefileIndexSizeMB,
+      final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
+      final int totalStaticBloomSizeKB) {
     this.regionLoad.add(new HServerLoad.RegionLoad(name, stores, storefiles,
-      storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB));
+      storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
+      totalStaticIndexSizeKB, totalStaticBloomSizeKB));
   }
 
   // Writable

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1181556&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Tue Oct 11 02:19:47 2011
@@ -0,0 +1,1288 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Provides functionality to write ({@link BlockIndexWriter}) and read
+ * ({@link BlockIndexReader}) single-level and multi-level block indexes.
+ *
+ * Examples of how to use the block index writer can be found in
+ * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
+ * to use the reader can be found in {@link HFileReaderV2} and
+ * {@link TestHFileBlockIndex}.
+ */
+public class HFileBlockIndex {
+
+  private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
+
+  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
+
+  /**
+   * The maximum size guideline for index blocks (both leaf, intermediate, and
+   * root). If not specified, {@link #DEFAULT_MAX_CHUNK_SIZE} is used.
+   */
+  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
+
+  /**
+   * The number of bytes stored in each "secondary index" entry in addition to
+   * key bytes in the non-root index block format. The first long is the file
+   * offset of the deeper-level block the entry points to, and the int that
+   * follows is that block's on-disk size without including header.
+   */
+  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
+
+  /**
+   * Error message when trying to use inline block API in single-level mode.
+   */
+  private static final String INLINE_BLOCKS_NOT_ALLOWED =
+      "Inline blocks are not allowed in the single-level-only mode";
+
+  /**
+   * Configuration key to cache leaf- and intermediate-level index blocks on
+   * write.
+   */
+  public static final String CACHE_INDEX_BLOCKS_ON_WRITE_KEY =
+      "hfile.block.index.cacheonwrite";
+
+  /**
+   * The size of a meta-data record used for finding the mid-key in a
+   * multi-level index. Consists of the middle leaf-level index block offset
+   * (long), its on-disk size without header included (int), and the mid-key
+   * entry's zero-based index in that leaf index block.
+   */
+  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
+      2 * Bytes.SIZEOF_INT;
+
+  /**
+   * The reader will always hold the root level index in the memory. Index
+   * blocks at all other levels will be cached in the LRU cache in practice,
+   * although this API does not enforce that.
+   *
+   * All non-root (leaf and intermediate) index blocks contain what we call a
+   * "secondary index": an array of offsets to the entries within the block.
+   * This allows us to do binary search for the entry corresponding to the
+   * given key without having to deserialize the block.
+   */
+  public static class BlockIndexReader implements HeapSize {
+    /** Needed doing lookup on blocks. */
+    private final RawComparator<byte[]> comparator;
+
+    // Root-level data.
+    private byte[][] blockKeys;
+    private long[] blockOffsets;
+    private int[] blockDataSizes;
+    private int rootByteSize = 0;
+    private int rootCount = 0;
+
+    // Mid-key metadata.
+    private long midLeafBlockOffset = -1;
+    private int midLeafBlockOnDiskSize = -1;
+    private int midKeyEntry = -1;
+
+    /** Pre-computed mid-key */
+    private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
+
+    /**
+     * The number of levels in the block index tree. One if there is only root
+     * level, two for root and leaf levels, etc.
+     */
+    private int searchTreeLevel;
+
+    /** A way to read {@link HFile} blocks at a given offset */
+    private HFileBlock.BasicReader blockReader;
+
+    public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel,
+        final HFileBlock.BasicReader blockReader) {
+      this(c, treeLevel);
+      this.blockReader = blockReader;
+    }
+
+    public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel)
+    {
+      comparator = c;
+      searchTreeLevel = treeLevel;
+    }
+
+    /**
+     * @return true if the block index is empty.
+     */
+    public boolean isEmpty() {
+      return blockKeys.length == 0;
+    }
+
+    /**
+     * Verifies that the block index is non-empty and throws an
+     * {@link IllegalStateException} otherwise.
+     */
+    public void ensureNonEmpty() {
+      if (blockKeys.length == 0) {
+        throw new IllegalStateException("Block index is empty or not loaded");
+      }
+    }
+
+    /**
+     * Return the data block which contains this key. This function will only
+     * be called when the HFile version is larger than 1.
+     *
+     * @param key the key we are looking for
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @param currentBlock the current block, to avoid re-reading the same
+     *          block
+     * @return reader a basic way to load blocks
+     * @throws IOException
+     */
+    public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
+        int keyLength, HFileBlock currentBlock)
+        throws IOException {
+      int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
+      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
+        return null;
+      }
+
+      // Read the next-level (intermediate or leaf) index block.
+      long currentOffset = blockOffsets[rootLevelIndex];
+      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
+
+      int lookupLevel = 1; // How many levels deep we are in our lookup.
+      HFileBlock block = blockReader.readBlockData(currentOffset,
+          currentOnDiskSize, -1, true);
+      if (block == null) {
+        throw new IOException("Failed to read block at offset " +
+            currentOffset + ", onDiskSize=" + currentOnDiskSize);
+      }
+      while (!block.getBlockType().equals(BlockType.DATA)) {
+        // Read the block. It may be intermediate level block, leaf level block
+        // or data block. In any case, we expect non-root index block format.
+
+        // We don't allow more than searchTreeLevel iterations of this loop.
+        if (++lookupLevel > searchTreeLevel) {
+          throw new IOException("Search Tree Level overflow: lookupLevel: "+
+              lookupLevel + " searchTreeLevel: " + searchTreeLevel);
+        }
+
+        // read to the byte buffer
+        ByteBuffer buffer = block.getBufferWithoutHeader();
+        if (!locateNonRootIndexEntry(buffer, key, keyOffset, keyLength,
+            comparator)) {
+          throw new IOException("The key "
+              + Bytes.toStringBinary(key, keyOffset, keyLength)
+              + " is before the" + " first key of the non-root index block "
+              + block);
+        }
+
+        currentOffset = buffer.getLong();
+        currentOnDiskSize = buffer.getInt();
+
+        // Located a deeper-level block, now read it.
+        if (currentBlock != null && currentBlock.getOffset() == currentOffset)
+        {
+          // Avoid reading the same block.
+          block = currentBlock;
+        } else {
+          block = blockReader.readBlockData(currentOffset, currentOnDiskSize,
+              -1, true);
+        }
+      }
+
+      if (lookupLevel != searchTreeLevel) {
+        throw new IOException("Reached a data block at level " + lookupLevel +
+            " but the number of levels is " + searchTreeLevel);
+      }
+
+      return block;
+    }
+
+    /**
+     * An approximation to the {@link HFile}'s mid-key. Operates on block
+     * boundaries, and does not go inside blocks. In other words, returns the
+     * first key of the middle block of the file.
+     *
+     * @return the first key of the middle block
+     */
+    public byte[] midkey() throws IOException {
+      if (rootCount == 0)
+        throw new IOException("HFile empty");
+
+      byte[] midKey = this.midKey.get();
+      if (midKey != null)
+        return midKey;
+
+      if (midLeafBlockOffset >= 0) {
+        if (blockReader == null) {
+          throw new IOException("Have to read the middle leaf block but " +
+              "no block reader available");
+        }
+        HFileBlock midLeafBlock = blockReader.readBlockData(
+            midLeafBlockOffset, midLeafBlockOnDiskSize, -1, true);
+        ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
+        int numDataBlocks = b.getInt();
+        int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
+        int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
+            keyRelOffset;
+        int keyOffset = b.arrayOffset() +
+            Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
+            SECONDARY_INDEX_ENTRY_OVERHEAD;
+        midKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
+      } else {
+        // The middle of the root-level index.
+        midKey = blockKeys[(rootCount - 1) / 2];
+      }
+
+      this.midKey.set(midKey);
+      return midKey;
+    }
+
+    /**
+     * @param i from 0 to {@link #getRootBlockCount() - 1}
+     */
+    public byte[] getRootBlockKey(int i) {
+      return blockKeys[i];
+    }
+
+    /**
+     * @param i from 0 to {@link #getRootBlockCount() - 1}
+     */
+    public long getRootBlockOffset(int i) {
+      return blockOffsets[i];
+    }
+
+    /**
+     * @param i zero-based index of a root-level block
+     * @return the on-disk size of the root-level block for version 2, or the
+     *         uncompressed size for version 1
+     */
+    public int getRootBlockDataSize(int i) {
+      return blockDataSizes[i];
+    }
+
+    /**
+     * @return the number of root-level blocks in this block index
+     */
+    public int getRootBlockCount() {
+      return rootCount;
+    }
+
+    /**
+     * Finds the root-level index block containing the given key.
+     *
+     * @param key
+     *          Key to find
+     * @return Offset of block containing <code>key</code> (between 0 and the
+     *         number of blocks - 1) or -1 if this file does not contain the
+     *         request.
+     */
+    public int rootBlockContainingKey(final byte[] key, int offset,
+        int length) {
+      int pos = Bytes.binarySearch(blockKeys, key, offset, length,
+          comparator);
+      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
+      // binarySearch's javadoc.
+
+      if (pos >= 0) {
+        // This means this is an exact match with an element of blockKeys.
+        assert pos < blockKeys.length;
+        return pos;
+      }
+
+      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
+      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
+      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
+      // key < blockKeys[0], meaning the file does not contain the given key.
+
+      int i = -pos - 1;
+      assert 0 <= i && i <= blockKeys.length;
+      return i - 1;
+    }
+
+    /**
+     * Adds a new entry in the root block index. Only used when reading.
+     *
+     * @param key Last key in the block
+     * @param offset file offset where the block is stored
+     * @param dataSize the uncompressed data size
+     */
+    private void add(final byte[] key, final long offset, final int dataSize) {
+      blockOffsets[rootCount] = offset;
+      blockKeys[rootCount] = key;
+      blockDataSizes[rootCount] = dataSize;
+
+      rootCount++;
+      rootByteSize += SECONDARY_INDEX_ENTRY_OVERHEAD + key.length;
+    }
+
+    /**
+     * Performs a binary search over a non-root level index block. Utilizes the
+     * secondary index, which records the offsets of (offset, onDiskSize,
+     * firstKey) tuples of all entries.
+     *
+     * @param key the key we are searching for offsets to individual entries in
+     *          the blockIndex buffer
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @param nonRootIndex the non-root index block buffer, starting with the
+     *          secondary index. The position is ignored.
+     * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
+     *         keys[i + 1], if keys is the array of all keys being searched, or
+     *         -1 otherwise
+     * @throws IOException
+     */
+    static int binarySearchNonRootIndex(byte[] key, int keyOffset,
+        int keyLength, ByteBuffer nonRootIndex,
+        RawComparator<byte[]> comparator) {
+
+      int numEntries = nonRootIndex.getInt(0);
+      int low = 0;
+      int high = numEntries - 1;
+      int mid = 0;
+
+      // Entries start after the number of entries and the secondary index.
+      // The secondary index takes numEntries + 1 ints.
+      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+      // If we imagine that keys[-1] = -Infinity and
+      // keys[numEntries] = Infinity, then we are maintaining an invariant that
+      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
+
+      while (low <= high) {
+        mid = (low + high) >>> 1;
+
+        // Midkey's offset relative to the end of secondary index
+        int midKeyRelOffset = nonRootIndex.getInt(
+            Bytes.SIZEOF_INT * (mid + 1));
+
+        // The offset of the middle key in the blockIndex buffer
+        int midKeyOffset = entriesOffset       // Skip secondary index
+            + midKeyRelOffset                  // Skip all entries until mid
+            + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
+
+        // We subtract the two consecutive secondary index elements, which
+        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
+        // then need to subtract the overhead of offset and onDiskSize.
+        int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
+            midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
+
+        // we have to compare in this order, because the comparator order
+        // has special logic when the 'left side' is a special key.
+        int cmp = comparator.compare(key, keyOffset, keyLength,
+            nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
+            midLength);
+
+        // key lives above the midpoint
+        if (cmp > 0)
+          low = mid + 1; // Maintain the invariant that keys[low - 1] < key
+        // key lives below the midpoint
+        else if (cmp < 0)
+          high = mid - 1; // Maintain the invariant that key < keys[high + 1]
+        else
+          return mid; // exact match
+      }
+
+      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
+      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
+      // condition, low >= high + 1. Therefore, low = high + 1.
+
+      if (low != high + 1) {
+        throw new IllegalStateException("Binary search broken: low=" + low
+            + " " + "instead of " + (high + 1));
+      }
+
+      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
+      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
+      int i = low - 1;
+
+      // Some extra validation on the result.
+      if (i < -1 || i >= numEntries) {
+        throw new IllegalStateException("Binary search broken: result is " +
+            i + " but expected to be between -1 and (numEntries - 1) = " +
+            (numEntries - 1));
+      }
+
+      return i;
+    }
+
+    /**
+     * Search for one key using the secondary index in a non-root block. In case
+     * of success, positions the provided buffer at the entry of interest, where
+     * the file offset and the on-disk-size can be read.
+     *
+     * @param nonRootBlock a non-root block without header. Initial position
+     *          does not matter.
+     * @param key the byte array containing the key
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @return true in the case the index entry containing the given key was
+     *         found, false in the case the given key is before the first key
+     *
+     */
+    static boolean locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
+        int keyOffset, int keyLength, RawComparator<byte[]> comparator) {
+      int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
+          nonRootBlock, comparator);
+
+      if (entryIndex == -1) {
+        return false;
+      }
+
+      int numEntries = nonRootBlock.getInt(0);
+
+      // The end of secondary index and the beginning of entries themselves.
+      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+      // The offset of the entry we are interested in relative to the end of
+      // the secondary index.
+      int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
+          * (1 + entryIndex));
+
+      nonRootBlock.position(entriesOffset + entryRelOffset);
+      return true;
+    }
+
+    /**
+     * Read in the root-level index from the given input stream. Must match
+     * what was written into the root level by
+     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
+     * offset that function returned.
+     *
+     * @param in the buffered input stream or wrapped byte input stream
+     * @param numEntries the number of root-level index entries
+     * @throws IOException
+     */
+    public void readRootIndex(DataInput in, final int numEntries)
+        throws IOException {
+      blockOffsets = new long[numEntries];
+      blockKeys = new byte[numEntries][];
+      blockDataSizes = new int[numEntries];
+
+      // If index size is zero, no index was written.
+      if (numEntries > 0) {
+        for (int i = 0; i < numEntries; ++i) {
+          long offset = in.readLong();
+          int dataSize = in.readInt();
+          byte[] key = Bytes.readByteArray(in);
+          add(key, offset, dataSize);
+        }
+      }
+    }
+
+    /**
+     * Read the root-level metadata of a multi-level block index. Based on
+     * {@link #readRootIndex(DataInput, int)}, but also reads metadata
+     * necessary to compute the mid-key in a multi-level index.
+     *
+     * @param in the buffered or byte input stream to read from
+     * @param numEntries the number of root-level index entries
+     * @throws IOException
+     */
+    public void readMultiLevelIndexRoot(DataInputStream in,
+        final int numEntries) throws IOException {
+      readRootIndex(in, numEntries);
+      if (in.available() < MID_KEY_METADATA_SIZE) {
+        // No mid-key metadata available.
+        return;
+      }
+
+      midLeafBlockOffset = in.readLong();
+      midLeafBlockOnDiskSize = in.readInt();
+      midKeyEntry = in.readInt();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("size=" + rootCount).append("\n");
+      for (int i = 0; i < rootCount; i++) {
+        sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
+            .append("\n  offset=").append(blockOffsets[i])
+            .append(", dataSize=" + blockDataSizes[i]).append("\n");
+      }
+      return sb.toString();
+    }
+
+    @Override
+    public long heapSize() {
+      long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
+          3 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
+
+      // Mid-key metadata.
+      heapSize += MID_KEY_METADATA_SIZE;
+
+      // Calculating the size of blockKeys
+      if (blockKeys != null) {
+        // Adding array + references overhead
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
+            * ClassSize.REFERENCE);
+
+        // Adding bytes
+        for (byte[] key : blockKeys) {
+          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
+        }
+      }
+
+      if (blockOffsets != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
+            * Bytes.SIZEOF_LONG);
+      }
+
+      if (blockDataSizes != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
+            * Bytes.SIZEOF_INT);
+      }
+
+      return ClassSize.align(heapSize);
+    }
+
+  }
+
+  /**
+   * Writes the block index into the output stream. Generate the tree from
+   * bottom up. The leaf level is written to disk as a sequence of inline
+   * blocks, if it is larger than a certain number of bytes. If the leaf level
+   * is not large enough, we write all entries to the root level instead.
+   *
+   * After all leaf blocks have been written, we end up with an index
+   * referencing the resulting leaf index blocks. If that index is larger than
+   * the allowed root index size, the writer will break it up into
+   * reasonable-size intermediate-level index block chunks write those chunks
+   * out, and create another index referencing those chunks. This will be
+   * repeated until the remaining index is small enough to become the root
+   * index. However, in most practical cases we will only have leaf-level
+   * blocks and the root index, or just the root index.
+   */
+  public static class BlockIndexWriter implements InlineBlockWriter {
+    /**
+     * While the index is being written, this represents the current block
+     * index referencing all leaf blocks, with one exception. If the file is
+     * being closed and there are not enough blocks to complete even a single
+     * leaf block, no leaf blocks get written and this contains the entire
+     * block index. After all levels of the index were written by
+     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
+     * root-level index.
+     */
+    private BlockIndexChunk rootChunk = new BlockIndexChunk();
+
+    /**
+     * Current leaf-level chunk. New entries referencing data blocks get added
+     * to this chunk until it grows large enough to be written to disk.
+     */
+    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
+
+    /**
+     * The number of block index levels. This is one if there is only root
+     * level (even empty), two if there a leaf level and root level, and is
+     * higher if there are intermediate levels. This is only final after
+     * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
+     * initial value accounts for the root level, and will be increased to two
+     * as soon as we find out there is a leaf-level in
+     * {@link #blockWritten(long, int)}.
+     */
+    private int numLevels = 1;
+
+    private HFileBlock.Writer blockWriter;
+    private byte[] firstKey = null;
+
+    /**
+     * The total number of leaf-level entries, i.e. entries referenced by
+     * leaf-level blocks. For the data block index this is equal to the number
+     * of data blocks.
+     */
+    private long totalNumEntries;
+
+    /** Total compressed size of all index blocks. */
+    private long totalBlockOnDiskSize;
+
+    /** Total uncompressed size of all index blocks. */
+    private long totalBlockUncompressedSize;
+
+    /** The maximum size guideline of all multi-level index blocks. */
+    private int maxChunkSize;
+
+    /** Whether we require this block index to always be single-level. */
+    private boolean singleLevelOnly;
+
+    /** Block cache, or null if cache-on-write is disabled */
+    private BlockCache blockCache;
+
+    /** Name to use for computing cache keys */
+    private String nameForCaching;
+
+    /** Creates a single-level block index writer */
+    public BlockIndexWriter() {
+      this(null, null, null);
+      singleLevelOnly = true;
+    }
+
+    /**
+     * Creates a multi-level block index writer.
+     *
+     * @param blockWriter the block writer to use to write index blocks
+     */
+    public BlockIndexWriter(HFileBlock.Writer blockWriter,
+        BlockCache blockCache, String nameForCaching) {
+      if ((blockCache == null) != (nameForCaching == null)) {
+        throw new IllegalArgumentException("Block cache and file name for " +
+            "caching must be both specified or both null");
+      }
+
+      this.blockWriter = blockWriter;
+      this.blockCache = blockCache;
+      this.nameForCaching = nameForCaching;
+      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+      if (maxChunkSize <= 0) {
+        throw new IllegalArgumentException("Invald maximum index block size");
+      }
+      this.maxChunkSize = maxChunkSize;
+    }
+
+    /**
+     * Writes the root level and intermediate levels of the block index into
+     * the output stream, generating the tree from bottom up. Assumes that the
+     * leaf level has been inline-written to the disk if there is enough data
+     * for more than one leaf block. We iterate by breaking the current level
+     * of the block index, starting with the index of all leaf-level blocks,
+     * into chunks small enough to be written to disk, and generate its parent
+     * level, until we end up with a level small enough to become the root
+     * level.
+     *
+     * If the leaf level is not large enough, there is no inline block index
+     * anymore, so we only write that level of block index to disk as the root
+     * level.
+     *
+     * @param out FSDataOutputStream
+     * @return position at which we entered the root-level index.
+     * @throws IOException
+     */
+    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
+      if (curInlineChunk.getNumEntries() != 0) {
+        throw new IOException("Trying to write a multi-level block index, " +
+            "but are " + curInlineChunk.getNumEntries() + " entries in the " +
+            "last inline chunk.");
+      }
+
+      // We need to get mid-key metadata before we create intermediate
+      // indexes and overwrite the root chunk.
+      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
+          : null;
+
+      while (rootChunk.getRootSize() > maxChunkSize) {
+        rootChunk = writeIntermediateLevel(out, rootChunk);
+        numLevels += 1;
+      }
+
+      // write the root level
+      long rootLevelIndexPos = out.getPos();
+
+      {
+        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
+            false);
+        rootChunk.writeRoot(blockStream);
+        if (midKeyMetadata != null)
+          blockStream.write(midKeyMetadata);
+        blockWriter.writeHeaderAndData(out);
+      }
+
+      // Add root index block size
+      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+      totalBlockUncompressedSize +=
+          blockWriter.getUncompressedSizeWithoutHeader();
+
+      LOG.info("Wrote a " + numLevels + "-level index with root level at pos "
+          + out.getPos() + ", " + rootChunk.getNumEntries()
+          + " root-level entries, " + totalNumEntries + " total entries, "
+          + totalBlockOnDiskSize + " bytes total on-disk size, "
+          + totalBlockUncompressedSize + " bytes total uncompressed size.");
+
+      return rootLevelIndexPos;
+    }
+
+    /**
+     * Writes the block index data as a single level only. Does not do any
+     * block framing.
+     *
+     * @param out the buffered output stream to write the index to. Typically a
+     *          stream writing into an {@link HFile} block.
+     * @param description a short description of the index being written. Used
+     *          in a log message.
+     * @throws IOException
+     */
+    public void writeSingleLevelIndex(DataOutput out, String description)
+        throws IOException {
+      expectNumLevels(1);
+
+      if (!singleLevelOnly)
+        throw new IOException("Single-level mode is turned off");
+
+      if (rootChunk.getNumEntries() > 0)
+        throw new IOException("Root-level entries already added in " +
+			"single-level mode");
+
+      rootChunk = curInlineChunk;
+      curInlineChunk = new BlockIndexChunk();
+
+      LOG.info("Wrote a single-level " + description + " index with "
+          + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
+          + " bytes");
+      rootChunk.writeRoot(out);
+    }
+
+    /**
+     * Split the current level of the block index into intermediate index
+     * blocks of permitted size and write those blocks to disk. Return the next
+     * level of the block index referencing those intermediate-level blocks.
+     *
+     * @param out
+     * @param currentLevel the current level of the block index, such as the a
+     *          chunk referencing all leaf-level index blocks
+     * @return the parent level block index, which becomes the root index after
+     *         a few (usually zero) iterations
+     * @throws IOException
+     */
+    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
+        BlockIndexChunk currentLevel) throws IOException {
+      // Entries referencing intermediate-level blocks we are about to create.
+      BlockIndexChunk parent = new BlockIndexChunk();
+
+      // The current intermediate-level block index chunk.
+      BlockIndexChunk curChunk = new BlockIndexChunk();
+
+      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
+        curChunk.add(currentLevel.getBlockKey(i),
+            currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
+
+        if (curChunk.getRootSize() >= maxChunkSize)
+          writeIntermediateBlock(out, parent, curChunk);
+      }
+
+      if (curChunk.getNumEntries() > 0) {
+        writeIntermediateBlock(out, parent, curChunk);
+      }
+
+      return parent;
+    }
+
+    private void writeIntermediateBlock(FSDataOutputStream out,
+        BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
+      long beginOffset = out.getPos();
+      DataOutputStream dos = blockWriter.startWriting(
+          BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+      curChunk.writeNonRoot(dos);
+      byte[] curFirstKey = curChunk.getBlockKey(0);
+      blockWriter.writeHeaderAndData(out);
+
+      if (blockCache != null) {
+        blockCache.cacheBlock(HFile.getBlockCacheKey(nameForCaching,
+            beginOffset), blockWriter.getBlockForCaching());
+      }
+
+      // Add intermediate index block size
+      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+      totalBlockUncompressedSize +=
+          blockWriter.getUncompressedSizeWithoutHeader();
+
+      // OFFSET is the beginning offset the chunk of block index entries.
+      // SIZE is the total byte size of the chunk of block index entries
+      // + the secondary index size
+      // FIRST_KEY is the first key in the chunk of block index
+      // entries.
+      parent.add(curFirstKey, beginOffset,
+          blockWriter.getOnDiskSizeWithHeader());
+
+      // clear current block index chunk
+      curChunk.clear();
+      curFirstKey = null;
+    }
+
+    /**
+     * @return how many block index entries there are in the root level
+     */
+    public final int getNumRootEntries() {
+      return rootChunk.getNumEntries();
+    }
+
+    /**
+     * @return the number of levels in this block index.
+     */
+    public int getNumLevels() {
+      return numLevels;
+    }
+
+    private void expectNumLevels(int expectedNumLevels) {
+      if (numLevels != expectedNumLevels) {
+        throw new IllegalStateException("Number of block index levels is "
+            + numLevels + "but is expected to be " + expectedNumLevels);
+      }
+    }
+
+    /**
+     * Whether there is an inline block ready to be written. In general, we
+     * write an leaf-level index block as an inline block as soon as its size
+     * as serialized in the non-root format reaches a certain threshold.
+     */
+    @Override
+    public boolean shouldWriteBlock(boolean closing) {
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      if (curInlineChunk.getNumEntries() == 0)
+        return false;
+
+      // We do have some entries in the current inline chunk.
+      if (closing) {
+        if (rootChunk.getNumEntries() == 0) {
+          // We did not add any leaf-level blocks yet. Instead of creating a
+          // leaf level with one block, move these entries to the root level.
+
+          expectNumLevels(1);
+          rootChunk = curInlineChunk;
+          curInlineChunk = new BlockIndexChunk();
+          return false;
+        }
+
+        return true;
+      } else {
+        return curInlineChunk.getNonRootSize() >= maxChunkSize;
+      }
+    }
+
+    /**
+     * Write out the current inline index block. Inline blocks are non-root
+     * blocks, so the non-root index format is used.
+     *
+     * @param out
+     * @param position The beginning offset of the inline block in the file not
+     *          include the header.
+     */
+    @Override
+    public void writeInlineBlock(DataOutput out) throws IOException {
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      // Write the inline block index to the output stream in the non-root
+      // index block format.
+      curInlineChunk.writeNonRoot(out);
+
+      // Save the first key of the inline block so that we can add it to the
+      // parent-level index.
+      firstKey = curInlineChunk.getBlockKey(0);
+
+      // Start a new inline index block
+      curInlineChunk.clear();
+    }
+
+    /**
+     * Called after an inline block has been written so that we can add an
+     * entry referring to that block to the parent-level index.
+     */
+    @Override
+    public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
+    {
+      // Add leaf index block size
+      totalBlockOnDiskSize += onDiskSize;
+      totalBlockUncompressedSize += uncompressedSize;
+
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      if (firstKey == null) {
+        throw new IllegalStateException("Trying to add second-level index " +
+            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
+            "but the first key was not set in writeInlineBlock");
+      }
+
+      if (rootChunk.getNumEntries() == 0) {
+        // We are writing the first leaf block, so increase index level.
+        expectNumLevels(1);
+        numLevels = 2;
+      }
+
+      // Add another entry to the second-level index. Include the number of
+      // entries in all previous leaf-level chunks for mid-key calculation.
+      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
+      firstKey = null;
+    }
+
+    @Override
+    public BlockType getInlineBlockType() {
+      return BlockType.LEAF_INDEX;
+    }
+
+    /**
+     * Add one index entry to the current leaf-level block. When the leaf-level
+     * block gets large enough, it will be flushed to disk as an inline block.
+     *
+     * @param firstKey the first key of the data block
+     * @param blockOffset the offset of the data block
+     * @param blockDataSize the on-disk size of the data block ({@link HFile}
+     *          format version 2), or the uncompressed size of the data block (
+     *          {@link HFile} format version 1).
+     */
+    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
+    {
+      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
+      ++totalNumEntries;
+    }
+
+    /**
+     * @throws IOException if we happened to write a multi-level index.
+     */
+    public void ensureSingeLevel() throws IOException {
+      if (numLevels > 1) {
+        throw new IOException ("Wrote a " + numLevels + "-level index with " +
+            rootChunk.getNumEntries() + " root-level entries, but " +
+            "this is expected to be a single-level block index.");
+      }
+    }
+
+    @Override
+    public boolean cacheOnWrite() {
+      return blockCache != null;
+    }
+
+    /**
+     * The total uncompressed size of the root index block, intermediate-level
+     * index blocks, and leaf-level index blocks.
+     *
+     * @return the total uncompressed size of all index blocks
+     */
+    public long getTotalUncompressedSize() {
+      return totalBlockUncompressedSize;
+    }
+
+  }
+
+  /**
+   * A single chunk of the block index in the process of writing. The data in
+   * this chunk can become a leaf-level, intermediate-level, or root index
+   * block.
+   */
+  static class BlockIndexChunk {
+
+    /** First keys of the key range corresponding to each index entry. */
+    private List<byte[]> blockKeys = new ArrayList<byte[]>();
+
+    /** Block offset in backing stream. */
+    private List<Long> blockOffsets = new ArrayList<Long>();
+
+    /** On-disk data sizes of lower-level data or index blocks. */
+    private List<Integer> onDiskDataSizes = new ArrayList<Integer>();
+
+    /**
+     * The cumulative number of sub-entries, i.e. entries on deeper-level block
+     * index entries. numSubEntriesAt[i] is the number of sub-entries in the
+     * blocks corresponding to this chunk's entries #0 through #i inclusively.
+     */
+    private List<Long> numSubEntriesAt = new ArrayList<Long>();
+
+    /**
+     * The offset of the next entry to be added, relative to the end of the
+     * "secondary index" in the "non-root" format representation of this index
+     * chunk. This is the next value to be added to the secondary index.
+     */
+    private int curTotalNonRootEntrySize = 0;
+
+    /**
+     * The accumulated size of this chunk if stored in the root index format.
+     */
+    private int curTotalRootSize = 0;
+
+    /**
+     * The "secondary index" used for binary search over variable-length
+     * records in a "non-root" format block. These offsets are relative to the
+     * end of this secondary index.
+     */
+    private List<Integer> secondaryIndexOffsetMarks = new ArrayList<Integer>();
+
+    /**
+     * Adds a new entry to this block index chunk.
+     *
+     * @param firstKey the first key in the block pointed to by this entry
+     * @param blockOffset the offset of the next-level block pointed to by this
+     *          entry
+     * @param onDiskDataSize the on-disk data of the block pointed to by this
+     *          entry, including header size
+     * @param curTotalNumSubEntries if this chunk is the root index chunk under
+     *          construction, this specifies the current total number of
+     *          sub-entries in all leaf-level chunks, including the one
+     *          corresponding to the second-level entry being added.
+     */
+    void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
+        long curTotalNumSubEntries) {
+      // Record the offset for the secondary index
+      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
+      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+          + firstKey.length;
+
+      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
+
+      blockKeys.add(firstKey);
+      blockOffsets.add(blockOffset);
+      onDiskDataSizes.add(onDiskDataSize);
+
+      if (curTotalNumSubEntries != -1) {
+        numSubEntriesAt.add(curTotalNumSubEntries);
+
+        // Make sure the parallel arrays are in sync.
+        if (numSubEntriesAt.size() != blockKeys.size()) {
+          throw new IllegalStateException("Only have key/value count " +
+              "stats for " + numSubEntriesAt.size() + " block index " +
+              "entries out of " + blockKeys.size());
+        }
+      }
+    }
+
+    /**
+     * The same as {@link #add(byte[], long, int, long)} but does not take the
+     * key/value into account. Used for single-level indexes.
+     *
+     * @see {@link #add(byte[], long, int, long)}
+     */
+    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
+      add(firstKey, blockOffset, onDiskDataSize, -1);
+    }
+
+    public void clear() {
+      blockKeys.clear();
+      blockOffsets.clear();
+      onDiskDataSizes.clear();
+      secondaryIndexOffsetMarks.clear();
+      numSubEntriesAt.clear();
+      curTotalNonRootEntrySize = 0;
+      curTotalRootSize = 0;
+    }
+
+    /**
+     * Finds the entry corresponding to the deeper-level index block containing
+     * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
+     * ordering of sub-entries.
+     *
+     * <p>
+     * <i> Implementation note. </i> We are looking for i such that
+     * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
+     * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
+     * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
+     * sub-entries. i is by definition the insertion point of k in
+     * numSubEntriesAt.
+     *
+     * @param k sub-entry index, from 0 to the total number sub-entries - 1
+     * @return the 0-based index of the entry corresponding to the given
+     *         sub-entry
+     */
+    public int getEntryBySubEntry(long k) {
+      // We define mid-key as the key corresponding to k'th sub-entry
+      // (0-based).
+
+      int i = Collections.binarySearch(numSubEntriesAt, k);
+
+      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
+      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
+      // is in the (i + 1)'th chunk.
+      if (i >= 0)
+        return i + 1;
+
+      // Inexact match. Return the insertion point.
+      return -i - 1;
+    }
+
+    /**
+     * Used when writing the root block index of a multi-level block index.
+     * Serializes additional information allowing to efficiently identify the
+     * mid-key.
+     *
+     * @return a few serialized fields for finding the mid-key
+     * @throws IOException if could not create metadata for computing mid-key
+     */
+    public byte[] getMidKeyMetadata() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(
+          MID_KEY_METADATA_SIZE);
+      DataOutputStream baosDos = new DataOutputStream(baos);
+      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
+      if (totalNumSubEntries == 0) {
+        throw new IOException("No leaf-level entries, mid-key unavailable");
+      }
+      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
+      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
+
+      baosDos.writeLong(blockOffsets.get(midKeyEntry));
+      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
+
+      long numSubEntriesBefore = midKeyEntry > 0
+          ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
+      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
+      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
+      {
+        throw new IOException("Could not identify mid-key index within the "
+            + "leaf-level block containing mid-key: out of range ("
+            + subEntryWithinEntry + ", numSubEntriesBefore="
+            + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
+            + ")");
+      }
+
+      baosDos.writeInt((int) subEntryWithinEntry);
+
+      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
+        throw new IOException("Could not write mid-key metadata: size=" +
+            baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
+      }
+
+      return baos.toByteArray();
+    }
+
+    /**
+     * Writes the block index chunk in the non-root index block format. This
+     * format contains the number of entries, an index of integer offsets
+     * for quick binary search on variable-length records, and tuples of
+     * block offset, on-disk block size, and the first key for each entry.
+     *
+     * @param out
+     * @throws IOException
+     */
+    void writeNonRoot(DataOutput out) throws IOException {
+      // The number of entries in the block.
+      out.writeInt(blockKeys.size());
+
+      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
+        throw new IOException("Corrupted block index chunk writer: " +
+            blockKeys.size() + " entries but " +
+            secondaryIndexOffsetMarks.size() + " secondary index items");
+      }
+
+      // For each entry, write a "secondary index" of relative offsets to the
+      // entries from the end of the secondary index. This works, because at
+      // read time we read the number of entries and know where the secondary
+      // index ends.
+      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
+        out.writeInt(currentSecondaryIndex);
+
+      // We include one other element in the secondary index to calculate the
+      // size of each entry more easily by subtracting secondary index elements.
+      out.writeInt(curTotalNonRootEntrySize);
+
+      for (int i = 0; i < blockKeys.size(); ++i) {
+        out.writeLong(blockOffsets.get(i));
+        out.writeInt(onDiskDataSizes.get(i));
+        out.write(blockKeys.get(i));
+      }
+    }
+
+    /**
+     * @return the size of this chunk if stored in the non-root index block
+     *         format
+     */
+    int getNonRootSize() {
+      return Bytes.SIZEOF_INT                          // Number of entries
+          + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
+          + curTotalNonRootEntrySize;                  // All entries
+    }
+
+    /**
+     * Writes this chunk into the given output stream in the root block index
+     * format. This format is similar to the {@link HFile} version 1 block
+     * index format, except that we store on-disk size of the block instead of
+     * its uncompressed size.
+     *
+     * @param out the data output stream to write the block index to. Typically
+     *          a stream writing into an {@link HFile} block.
+     * @throws IOException
+     */
+    void writeRoot(DataOutput out) throws IOException {
+      for (int i = 0; i < blockKeys.size(); ++i) {
+        out.writeLong(blockOffsets.get(i));
+        out.writeInt(onDiskDataSizes.get(i));
+        Bytes.writeByteArray(out, blockKeys.get(i));
+      }
+    }
+
+    /**
+     * @return the size of this chunk if stored in the root index block format
+     */
+    int getRootSize() {
+      return curTotalRootSize;
+    }
+
+    /**
+     * @return the number of entries in this block index chunk
+     */
+    public int getNumEntries() {
+      return blockKeys.size();
+    }
+
+    public byte[] getBlockKey(int i) {
+      return blockKeys.get(i);
+    }
+
+    public long getBlockOffset(int i) {
+      return blockOffsets.get(i);
+    }
+
+    public int getOnDiskDataSize(int i) {
+      return onDiskDataSizes.get(i);
+    }
+
+    public long getCumulativeNumKV(int i) {
+      if (i < 0)
+        return 0;
+      return numSubEntriesAt.get(i);
+    }
+
+  }
+
+  /**
+   * @return true if the given configuration specifies that we should
+   *         cache-on-write index blocks
+   */
+  public static boolean shouldCacheOnWrite(Configuration conf) {
+    return conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
+  }
+
+  public static int getMaxChunkSize(Configuration conf) {
+    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
+  }
+
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181556&r1=1181555&r2=1181556&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:19:47 2011
@@ -770,18 +770,33 @@ public class HRegionServer implements HR
     int storefileSizeMB = 0;
     int memstoreSizeMB = (int)(r.memstoreSize.get()/1024/1024);
     int storefileIndexSizeMB = 0;
+    int rootIndexSizeKB = 0;
+    int totalStaticIndexSizeKB = 0;
+    int totalStaticBloomSizeKB = 0;
     synchronized (r.stores) {
       stores += r.stores.size();
       for (Store store: r.stores.values()) {
         storefiles += store.getStorefilesCount();
+
         storefileSizeMB +=
           (int)(store.getStorefilesSize()/1024/1024);
+
         storefileIndexSizeMB +=
           (int)(store.getStorefilesIndexSize()/1024/1024);
+
+        rootIndexSizeKB +=
+            (int) (store.getStorefilesIndexSize() / 1024);
+
+        totalStaticIndexSizeKB +=
+          (int) (store.getTotalStaticIndexSize() / 1024);
+
+        totalStaticBloomSizeKB +=
+          (int) (store.getTotalStaticBloomSize() / 1024);
       }
     }
     return new HServerLoad.RegionLoad(name, stores, storefiles,
-      storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB);
+      storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
+      totalStaticIndexSizeKB, totalStaticBloomSizeKB);
   }
 
   /**
@@ -987,6 +1002,8 @@ public class HRegionServer implements HR
     int storefiles = 0;
     long memstoreSize = 0;
     long storefileIndexSize = 0;
+    long totalStaticIndexSize = 0;
+    long totalStaticBloomSize = 0;
     synchronized (this.onlineRegions) {
       for (Map.Entry<Integer, HRegion> e: this.onlineRegions.entrySet()) {
         HRegion r = e.getValue();
@@ -997,6 +1014,8 @@ public class HRegionServer implements HR
             Store store = ee.getValue();
             storefiles += store.getStorefilesCount();
             storefileIndexSize += store.getStorefilesIndexSize();
+            totalStaticIndexSize += store.getTotalStaticIndexSize();
+            totalStaticBloomSize += store.getTotalStaticBloomSize();
           }
         }
       }
@@ -1004,7 +1023,14 @@ public class HRegionServer implements HR
     this.metrics.stores.set(stores);
     this.metrics.storefiles.set(storefiles);
     this.metrics.memstoreSizeMB.set((int)(memstoreSize/(1024*1024)));
-    this.metrics.storefileIndexSizeMB.set((int)(storefileIndexSize/(1024*1024)));
+    this.metrics.storefileIndexSizeMB.set(
+        (int) (storefileIndexSize / (1024 * 1024)));
+    this.metrics.rootIndexSizeKB.set(
+        (int) (storefileIndexSize / 1024));
+    this.metrics.totalStaticIndexSizeKB.set(
+        (int) (totalStaticIndexSize / 1024));
+    this.metrics.totalStaticBloomSizeKB.set(
+        (int) (totalStaticBloomSize / 1024));
     this.metrics.compactionQueueSize.set(compactSplitThread.
       getCompactionQueueSize());
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=1181556&r1=1181555&r2=1181556&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Tue Oct 11 02:19:47 2011
@@ -109,11 +109,25 @@ public class RegionServerMetrics impleme
   public final MetricsIntValue storefiles = new MetricsIntValue("storefiles", registry);
 
   /**
-   * Sum of all the storefile index sizes in this regionserver in MB
+   * Sum of all the storefile index sizes in this regionserver in MB. This is
+   * a legacy metric to be phased out as we fully transition to multi-level
+   * block indexes.
    */
   public final MetricsIntValue storefileIndexSizeMB =
     new MetricsIntValue("storefileIndexSizeMB", registry);
 
+  /** The total size of block index root levels in this regionserver in KB. */
+  public final MetricsIntValue rootIndexSizeKB =
+    new MetricsIntValue("rootIndexSizeKB", registry);
+
+  /** Total size of all block indexes (not necessarily loaded in memory) */
+  public final MetricsIntValue totalStaticIndexSizeKB =
+    new MetricsIntValue("totalStaticIndexSizeKB", registry);
+
+  /** Total size of all Bloom filters (not necessarily loaded in memory) */
+  public final MetricsIntValue totalStaticBloomSizeKB =
+    new MetricsIntValue("totalStaticBloomSizeKB", registry);
+
   /**
    * Sum of all the memstore sizes in this regionserver in MB
    */
@@ -241,6 +255,9 @@ public class RegionServerMetrics impleme
       this.stores.pushMetric(this.metricsRecord);
       this.storefiles.pushMetric(this.metricsRecord);
       this.storefileIndexSizeMB.pushMetric(this.metricsRecord);
+      this.rootIndexSizeKB.pushMetric(this.metricsRecord);
+      this.totalStaticIndexSizeKB.pushMetric(this.metricsRecord);
+      this.totalStaticBloomSizeKB.pushMetric(this.metricsRecord);
       this.memstoreSizeMB.pushMetric(this.metricsRecord);
       this.regions.pushMetric(this.metricsRecord);
       this.requests.pushMetric(this.metricsRecord);
@@ -371,6 +388,12 @@ public class RegionServerMetrics impleme
       Integer.valueOf(this.storefiles.get()));
     sb = Strings.appendKeyValue(sb, "storefileIndexSize",
       Integer.valueOf(this.storefileIndexSizeMB.get()));
+    sb = Strings.appendKeyValue(sb, "rootIndexSizeKB",
+        Integer.valueOf(this.rootIndexSizeKB.get()));
+    sb = Strings.appendKeyValue(sb, "totalStaticIndexSizeKB",
+        Integer.valueOf(this.totalStaticIndexSizeKB.get()));
+    sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
+        Integer.valueOf(this.totalStaticBloomSizeKB.get()));
     sb = Strings.appendKeyValue(sb, "memstoreSize",
       Integer.valueOf(this.memstoreSizeMB.get()));
     sb = Strings.appendKeyValue(sb, "compactionQueueSize",

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1181556&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Tue Oct 11 02:19:47 2011
@@ -0,0 +1,603 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestHFileBlockIndex {
+
+  @Parameters
+  public static Collection<Object[]> compressionAlgorithms() {
+    return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED;
+  }
+
+  public TestHFileBlockIndex(Compression.Algorithm compr) {
+    this.compr = compr;
+  }
+
+  private static final Log LOG = LogFactory.getLog(TestHFileBlockIndex.class);
+
+  private static final int NUM_DATA_BLOCKS = 1000;
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private static final int SMALL_BLOCK_SIZE = 4096;
+  private static final int NUM_KV = 10000;
+
+  private static FileSystem fs;
+  private Path path;
+  private Random rand;
+  private long rootIndexOffset;
+  private int numRootEntries;
+  private int numLevels;
+  private static final List<byte[]> keys = new ArrayList<byte[]>();
+  private final Compression.Algorithm compr;
+  private byte[] firstKeyInFile;
+  private Configuration conf;
+
+  private static final int[] INDEX_CHUNK_SIZES = { 4096, 512, 384 };
+  private static final int[] EXPECTED_NUM_LEVELS = { 2, 3, 4 };
+  private static final int[] UNCOMPRESSED_INDEX_SIZES =
+      { 19187, 21813, 23086 };
+
+  static {
+    assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
+    assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    keys.clear();
+    rand = new Random(2389757);
+    firstKeyInFile = null;
+    conf = TEST_UTIL.getConfiguration();
+
+    // This test requires at least HFile format version 2.
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testBlockIndex() throws IOException {
+    path = new Path(HBaseTestingUtility.getTestDir(), "block_index_" + compr);
+    writeWholeIndex();
+    readIndex();
+  }
+
+  /**
+   * A wrapper around a block reader which only caches the results of the last
+   * operation. Not thread-safe.
+   */
+  private static class BlockReaderWrapper implements HFileBlock.BasicReader {
+
+    private HFileBlock.BasicReader realReader;
+    private long prevOffset;
+    private long prevOnDiskSize;
+    private long prevUncompressedSize;
+    private boolean prevPread;
+    private HFileBlock prevBlock;
+
+    public int hitCount = 0;
+    public int missCount = 0;
+
+    public BlockReaderWrapper(HFileBlock.BasicReader realReader) {
+      this.realReader = realReader;
+    }
+
+    @Override
+    public HFileBlock readBlockData(long offset, long onDiskSize,
+        int uncompressedSize, boolean pread) throws IOException {
+      if (offset == prevOffset && onDiskSize == prevOnDiskSize &&
+          uncompressedSize == prevUncompressedSize && pread == prevPread) {
+        hitCount += 1;
+        return prevBlock;
+      }
+
+      missCount += 1;
+      prevBlock = realReader.readBlockData(offset, onDiskSize,
+          uncompressedSize, pread);
+      prevOffset = offset;
+      prevOnDiskSize = onDiskSize;
+      prevUncompressedSize = uncompressedSize;
+      prevPread = pread;
+
+      return prevBlock;
+    }
+  }
+
+  public void readIndex() throws IOException {
+    long fileSize = fs.getFileStatus(path).getLen();
+    LOG.info("Size of " + path + ": " + fileSize);
+
+    FSDataInputStream istream = fs.open(path);
+    HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream,
+        compr, fs.getFileStatus(path).getLen());
+
+    BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
+    HFileBlockIndex.BlockIndexReader indexReader =
+        new HFileBlockIndex.BlockIndexReader(
+            Bytes.BYTES_RAWCOMPARATOR, numLevels, brw);
+
+    indexReader.readRootIndex(blockReader.blockRange(rootIndexOffset,
+        fileSize).nextBlockAsStream(BlockType.ROOT_INDEX), numRootEntries);
+
+    long prevOffset = -1;
+    int i = 0;
+    int expectedHitCount = 0;
+    int expectedMissCount = 0;
+    LOG.info("Total number of keys: " + keys.size());
+    for (byte[] key : keys) {
+      assertTrue(key != null);
+      assertTrue(indexReader != null);
+      HFileBlock b = indexReader.seekToDataBlock(key, 0, key.length, null);
+      if (Bytes.BYTES_RAWCOMPARATOR.compare(key, firstKeyInFile) < 0) {
+        assertTrue(b == null);
+        ++i;
+        continue;
+      }
+
+      String keyStr = "key #" + i + ", " + Bytes.toStringBinary(key);
+
+      assertTrue("seekToDataBlock failed for " + keyStr, b != null);
+
+      if (prevOffset == b.getOffset()) {
+        assertEquals(++expectedHitCount, brw.hitCount);
+      } else {
+        LOG.info("First key in a new block: " + keyStr + ", block offset: "
+            + b.getOffset() + ")");
+        assertTrue(b.getOffset() > prevOffset);
+        assertEquals(++expectedMissCount, brw.missCount);
+        prevOffset = b.getOffset();
+      }
+      ++i;
+    }
+
+    istream.close();
+  }
+
+  private void writeWholeIndex() throws IOException {
+    assertEquals(0, keys.size());
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
+    FSDataOutputStream outputStream = fs.create(path);
+    HFileBlockIndex.BlockIndexWriter biw =
+        new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
+
+    for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
+      hbw.startWriting(BlockType.DATA, false).write(
+          String.valueOf(rand.nextInt(1000)).getBytes());
+      long blockOffset = outputStream.getPos();
+      hbw.writeHeaderAndData(outputStream);
+
+      byte[] firstKey = null;
+      for (int j = 0; j < 16; ++j) {
+        byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 16 + j);
+        keys.add(k);
+        if (j == 8)
+          firstKey = k;
+      }
+      assertTrue(firstKey != null);
+      if (firstKeyInFile == null)
+        firstKeyInFile = firstKey;
+      biw.addEntry(firstKey, blockOffset, hbw.getOnDiskSizeWithHeader());
+
+      writeInlineBlocks(hbw, outputStream, biw, false);
+    }
+    writeInlineBlocks(hbw, outputStream, biw, true);
+    rootIndexOffset = biw.writeIndexBlocks(outputStream);
+    outputStream.close();
+
+    numLevels = biw.getNumLevels();
+    numRootEntries = biw.getNumRootEntries();
+
+    LOG.info("Index written: numLevels=" + numLevels + ", numRootEntries=" +
+        numRootEntries + ", rootIndexOffset=" + rootIndexOffset);
+  }
+
+  private void writeInlineBlocks(HFileBlock.Writer hbw,
+      FSDataOutputStream outputStream, HFileBlockIndex.BlockIndexWriter biw,
+      boolean isClosing) throws IOException {
+    while (biw.shouldWriteBlock(isClosing)) {
+      long offset = outputStream.getPos();
+      biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
+      hbw.writeHeaderAndData(outputStream);
+      biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
+          hbw.getUncompressedSizeWithoutHeader());
+      LOG.info("Wrote an inline index block at " + offset + ", size " +
+          hbw.getOnDiskSizeWithHeader());
+    }
+  }
+
+  private static final long getDummyFileOffset(int i) {
+    return i * 185 + 379;
+  }
+
+  private static final int getDummyOnDiskSize(int i) {
+    return i * i * 37 + i * 19 + 13;
+  }
+
+  @Test
+  public void testSecondaryIndexBinarySearch() throws IOException {
+    int numTotalKeys = 99;
+    assertTrue(numTotalKeys % 2 == 1); // Ensure no one made this even.
+
+    // We only add odd-index keys into the array that we will binary-search.
+    int numSearchedKeys = (numTotalKeys - 1) / 2;
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    dos.writeInt(numSearchedKeys);
+    int curAllEntriesSize = 0;
+    int numEntriesAdded = 0;
+
+    // Only odd-index elements of this array are used to keep the secondary
+    // index entries of the corresponding keys.
+    int secondaryIndexEntries[] = new int[numTotalKeys];
+
+    for (int i = 0; i < numTotalKeys; ++i) {
+      byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 2);
+      keys.add(k);
+      String msgPrefix = "Key #" + i + " (" + Bytes.toStringBinary(k) + "): ";
+      StringBuilder padding = new StringBuilder();
+      while (msgPrefix.length() + padding.length() < 70)
+        padding.append(' ');
+      msgPrefix += padding;
+      if (i % 2 == 1) {
+        dos.writeInt(curAllEntriesSize);
+        secondaryIndexEntries[i] = curAllEntriesSize;
+        LOG.info(msgPrefix + "secondary index entry #" + ((i - 1) / 2) +
+            ", offset " + curAllEntriesSize);
+        curAllEntriesSize += k.length
+            + HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;
+        ++numEntriesAdded;
+      } else {
+        secondaryIndexEntries[i] = -1;
+        LOG.info(msgPrefix + "not in the searched array");
+      }
+    }
+
+    // Make sure the keys are increasing.
+    for (int i = 0; i < keys.size() - 1; ++i)
+      assertTrue(Bytes.BYTES_RAWCOMPARATOR.compare(keys.get(i),
+          keys.get(i + 1)) < 0);
+
+    dos.writeInt(curAllEntriesSize);
+    assertEquals(numSearchedKeys, numEntriesAdded);
+    int secondaryIndexOffset = dos.size();
+    assertEquals(Bytes.SIZEOF_INT * (numSearchedKeys + 2),
+        secondaryIndexOffset);
+
+    for (int i = 1; i <= numTotalKeys - 1; i += 2) {
+      assertEquals(dos.size(),
+          secondaryIndexOffset + secondaryIndexEntries[i]);
+      long dummyFileOffset = getDummyFileOffset(i);
+      int dummyOnDiskSize = getDummyOnDiskSize(i);
+      LOG.debug("Storing file offset=" + dummyFileOffset + " and onDiskSize=" +
+          dummyOnDiskSize + " at offset " + dos.size());
+      dos.writeLong(dummyFileOffset);
+      dos.writeInt(dummyOnDiskSize);
+      LOG.debug("Stored key " + ((i - 1) / 2) +" at offset " + dos.size());
+      dos.write(keys.get(i));
+    }
+
+    dos.writeInt(curAllEntriesSize);
+
+    ByteBuffer nonRootIndex = ByteBuffer.wrap(baos.toByteArray());
+    for (int i = 0; i < numTotalKeys; ++i) {
+      byte[] searchKey = keys.get(i);
+      byte[] arrayHoldingKey = new byte[searchKey.length +
+                                        searchKey.length / 2];
+
+      // To make things a bit more interesting, store the key we are looking
+      // for at a non-zero offset in a new array.
+      System.arraycopy(searchKey, 0, arrayHoldingKey, searchKey.length / 2,
+            searchKey.length);
+
+      int searchResult = BlockIndexReader.binarySearchNonRootIndex(
+          arrayHoldingKey, searchKey.length / 2, searchKey.length, nonRootIndex,
+          Bytes.BYTES_RAWCOMPARATOR);
+      String lookupFailureMsg = "Failed to look up key #" + i + " ("
+          + Bytes.toStringBinary(searchKey) + ")";
+
+      int expectedResult;
+      int referenceItem;
+
+      if (i % 2 == 1) {
+        // This key is in the array we search as the element (i - 1) / 2. Make
+        // sure we find it.
+        expectedResult = (i - 1) / 2;
+        referenceItem = i;
+      } else {
+        // This key is not in the array but between two elements on the array,
+        // in the beginning, or in the end. The result should be the previous
+        // key in the searched array, or -1 for i = 0.
+        expectedResult = i / 2 - 1;
+        referenceItem = i - 1;
+      }
+
+      assertEquals(lookupFailureMsg, expectedResult, searchResult);
+
+      // Now test we can get the offset and the on-disk-size using a
+      // higher-level API function.s
+      boolean locateBlockResult =
+        BlockIndexReader.locateNonRootIndexEntry(nonRootIndex, arrayHoldingKey,
+            searchKey.length / 2, searchKey.length, Bytes.BYTES_RAWCOMPARATOR);
+
+      if (i == 0) {
+        assertFalse(locateBlockResult);
+      } else {
+        assertTrue(locateBlockResult);
+        String errorMsg = "i=" + i + ", position=" + nonRootIndex.position();
+        assertEquals(errorMsg, getDummyFileOffset(referenceItem),
+            nonRootIndex.getLong());
+        assertEquals(errorMsg, getDummyOnDiskSize(referenceItem),
+            nonRootIndex.getInt());
+      }
+    }
+
+  }
+
+  @Test
+  public void testBlockIndexChunk() throws IOException {
+    BlockIndexChunk c = new BlockIndexChunk();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    int N = 1000;
+    int[] numSubEntriesAt = new int[N];
+    int numSubEntries = 0;
+    for (int i = 0; i < N; ++i) {
+      baos.reset();
+      DataOutputStream dos = new DataOutputStream(baos);
+      c.writeNonRoot(dos);
+      assertEquals(c.getNonRootSize(), dos.size());
+
+      baos.reset();
+      dos = new DataOutputStream(baos);
+      c.writeRoot(dos);
+      assertEquals(c.getRootSize(), dos.size());
+
+      byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
+      numSubEntries += rand.nextInt(5) + 1;
+      keys.add(k);
+      c.add(k, getDummyFileOffset(i), getDummyOnDiskSize(i), numSubEntries);
+    }
+
+    // Test the ability to look up the entry that contains a particular
+    // deeper-level index block's entry ("sub-entry"), assuming a global
+    // 0-based ordering of sub-entries. This is needed for mid-key calculation.
+    for (int i = 0; i < N; ++i) {
+      for (int j = i == 0 ? 0 : numSubEntriesAt[i - 1];
+           j < numSubEntriesAt[i];
+           ++j) {
+        assertEquals(i, c.getEntryBySubEntry(j));
+      }
+    }
+  }
+
+  /** Checks if the HeapSize calculator is within reason */
+  @Test
+  public void testHeapSizeForBlockIndex() throws IOException {
+    Class<HFileBlockIndex.BlockIndexReader> cl =
+        HFileBlockIndex.BlockIndexReader.class;
+    long expected = ClassSize.estimateBase(cl, false);
+
+    HFileBlockIndex.BlockIndexReader bi =
+        new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
+    long actual = bi.heapSize();
+
+    // Since the arrays in BlockIndex(byte [][] blockKeys, long [] blockOffsets,
+    // int [] blockDataSizes) are all null they are not going to show up in the
+    // HeapSize calculation, so need to remove those array costs from expected.
+    expected -= ClassSize.align(3 * ClassSize.ARRAY);
+
+    if (expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+  }
+
+  /**
+   * Testing block index through the HFile writer/reader APIs. Allows to test
+   * setting index block size through configuration, intermediate-level index
+   * blocks, and caching index blocks on write.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testHFileWriterAndReader() throws IOException {
+    Path hfilePath = new Path(HBaseTestingUtility.getTestDir(),
+        "hfile_for_block_index");
+    BlockCache blockCache = StoreFile.getBlockCache(conf);
+
+    for (int testI = 0; testI < INDEX_CHUNK_SIZES.length; ++testI) {
+      int indexBlockSize = INDEX_CHUNK_SIZES[testI];
+      int expectedNumLevels = EXPECTED_NUM_LEVELS[testI];
+      LOG.info("Index block size: " + indexBlockSize + ", compression: "
+          + compr);
+      // Evict all blocks that were cached-on-write by the previous invocation.
+      blockCache.evictBlocksByPrefix(hfilePath.getName()
+          + HFile.CACHE_KEY_SEPARATOR);
+
+      conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize);
+      Set<String> keyStrSet = new HashSet<String>();
+      byte[][] keys = new byte[NUM_KV][];
+      byte[][] values = new byte[NUM_KV][];
+
+      // Write the HFile
+      {
+        HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fs,
+            hfilePath, SMALL_BLOCK_SIZE, HFile.DEFAULT_BYTES_PER_CHECKSUM,
+            compr, KeyValue.KEY_COMPARATOR);
+        Random rand = new Random(19231737);
+
+        for (int i = 0; i < NUM_KV; ++i) {
+          byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
+
+          // Key will be interpreted by KeyValue.KEY_COMPARATOR
+          byte[] k = KeyValue.createFirstOnRow(row, 0, row.length, row, 0, 0,
+              row, 0, 0).getKey();
+
+          byte[] v = TestHFileWriterV2.randomValue(rand);
+          writer.append(k, v);
+          keys[i] = k;
+          values[i] = v;
+          keyStrSet.add(Bytes.toStringBinary(k));
+
+          if (i > 0) {
+            assertTrue(KeyValue.KEY_COMPARATOR.compare(keys[i - 1],
+                keys[i]) < 0);
+          }
+        }
+
+        writer.close();
+      }
+
+      // Read the HFile
+      HFile.Reader reader = HFile.createReader(fs, hfilePath, blockCache,
+          false, true);
+      assertEquals(expectedNumLevels,
+          reader.getTrailer().getNumDataIndexLevels());
+
+      assertTrue(Bytes.equals(keys[0], reader.getFirstKey()));
+      assertTrue(Bytes.equals(keys[NUM_KV - 1], reader.getLastKey()));
+      LOG.info("Last key: " + Bytes.toStringBinary(keys[NUM_KV - 1]));
+
+      for (boolean pread : new boolean[] { false, true }) {
+        HFileScanner scanner = reader.getScanner(true, pread);
+        for (int i = 0; i < NUM_KV; ++i) {
+          checkSeekTo(keys, scanner, i);
+          checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(),
+              scanner.getValue());
+        }
+        assertTrue(scanner.seekTo());
+        for (int i = NUM_KV - 1; i >= 0; --i) {
+          checkSeekTo(keys, scanner, i);
+          checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(),
+              scanner.getValue());
+        }
+      }
+
+      // Manually compute the mid-key and validate it.
+      HFileReaderV2 reader2 = (HFileReaderV2) reader;
+      HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader();
+
+      HFileBlock.BlockIterator iter = fsReader.blockRange(0,
+          reader.getTrailer().getLoadOnOpenDataOffset());
+      HFileBlock block;
+      List<byte[]> blockKeys = new ArrayList<byte[]>();
+      while ((block = iter.nextBlock()) != null) {
+        if (block.getBlockType() != BlockType.LEAF_INDEX)
+          return;
+        ByteBuffer b = block.getBufferReadOnly();
+        int n = b.getInt();
+        // One int for the number of items, and n + 1 for the secondary index.
+        int entriesOffset = Bytes.SIZEOF_INT * (n + 2);
+
+        // Get all the keys from the leaf index block. S
+        for (int i = 0; i < n; ++i) {
+          int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 1));
+          int nextKeyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 2));
+          int keyLen = nextKeyRelOffset - keyRelOffset;
+          int keyOffset = b.arrayOffset() + entriesOffset + keyRelOffset +
+              HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;
+          byte[] blockKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset
+              + keyLen);
+          String blockKeyStr = Bytes.toString(blockKey);
+          blockKeys.add(blockKey);
+
+          // If the first key of the block is not among the keys written, we
+          // are not parsing the non-root index block format correctly.
+          assertTrue("Invalid block key from leaf-level block: " + blockKeyStr,
+              keyStrSet.contains(blockKeyStr));
+        }
+      }
+
+      // Validate the mid-key.
+      assertEquals(
+          Bytes.toStringBinary(blockKeys.get((blockKeys.size() - 1) / 2)),
+          Bytes.toStringBinary(reader.midkey()));
+
+      assertEquals(UNCOMPRESSED_INDEX_SIZES[testI],
+          reader.getTrailer().getUncompressedDataIndexSize());
+
+      reader.close();
+    }
+  }
+
+  private void checkSeekTo(byte[][] keys, HFileScanner scanner, int i)
+      throws IOException {
+    assertEquals("Failed to seek to key #" + i + " ("
+        + Bytes.toStringBinary(keys[i]) + ")", 0, scanner.seekTo(keys[i]));
+  }
+
+  private void assertArrayEqualsBuffer(String msgPrefix, byte[] arr,
+      ByteBuffer buf) {
+    assertEquals(msgPrefix + ": expected " + Bytes.toStringBinary(arr)
+        + ", actual " + Bytes.toStringBinary(buf), 0, Bytes.compareTo(arr, 0,
+        arr.length, buf.array(), buf.arrayOffset(), buf.limit()));
+  }
+
+  /** Check a key/value pair after it was read by the reader */
+  private void checkKeyValue(String msgPrefix, byte[] expectedKey,
+      byte[] expectedValue, ByteBuffer keyRead, ByteBuffer valueRead) {
+    if (!msgPrefix.isEmpty())
+      msgPrefix += ". ";
+
+    assertArrayEqualsBuffer(msgPrefix + "Invalid key", expectedKey, keyRead);
+    assertArrayEqualsBuffer(msgPrefix + "Invalid value", expectedValue,
+        valueRead);
+  }
+
+}