You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:02 UTC

[3/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java
new file mode 100644
index 0000000..f1c87ba
--- /dev/null
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java
@@ -0,0 +1,2399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.file.tfile.ByteArray;
+import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader;
+import org.apache.hadoop.io.file.tfile.DTBCFile.Writer.BlockAppender;
+import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
+import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
+import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
+import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+
+/**
+ * <ul>
+ * <li>The file format of DTFile is same as {@link TFile} with different reader implementation.
+ * It reads data block by block and cache the binary block data into memory to speed up the random read.
+ *
+ * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation.
+ * Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method
+ *  to expose raw block, key, value data to user to avoid unnecessary internal/external data copy
+ *
+ * <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory)
+ * </ul>
+ *
+ * A TFile is a container of key-value pairs. Both keys and values are type-less
+ * bytes. Keys are restricted to 64KB, value length is not restricted
+ * (practically limited to the available disk storage). TFile further provides
+ * the following features:
+ * <ul>
+ * <li>Block Compression.
+ * <li>Named meta data blocks.
+ * <li>Sorted or unsorted keys.
+ * <li>Seek by key or by file offset.
+ * </ul>
+ * The memory footprint of a TFile includes the following:
+ * <ul>
+ * <li>Some constant overhead of reading or writing a compressed block.
+ * <ul>
+ * <li>Each compressed block requires one compression/decompression codec for
+ * I/O.
+ * <li>Temporary space to buffer the key.
+ * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
+ * chunk encoded, so that we buffer at most one chunk of user data. By default,
+ * the chunk buffer is 1MB. Reading chunked value does not require additional
+ * memory.
+ * </ul>
+ * <li>TFile index, which is proportional to the total number of Data Blocks.
+ * The total amount of memory needed to hold the index can be estimated as
+ * (56+AvgKeySize)*NumBlocks.
+ * <li>MetaBlock index, which is proportional to the total number of Meta
+ * Blocks.The total amount of memory needed to hold the index for Meta Blocks
+ * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
+ * </ul>
+ * <p>
+ * The behavior of TFile can be customized by the following variables through
+ * Configuration:
+ * <ul>
+ * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
+ * to 1MB. Values of the length less than the chunk size is guaranteed to have
+ * known value length in read time (See
+ * {@link DTFile.Reader.Scanner.Entry#isValueLengthKnown()}).
+ * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
+ * FSDataOutputStream. Integer (in bytes). Default to 256KB.
+ * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
+ * FSDataInputStream. Integer (in bytes). Default to 256KB.
+ * </ul>
+ * <p>
+ * Suggestions on performance optimization.
+ * <ul>
+ * <li>Minimum block size. We recommend a setting of minimum block size between
+ * 256KB to 1MB for general usage. Larger block size is preferred if files are
+ * primarily for sequential access. However, it would lead to inefficient random
+ * access (because there are more data to decompress). Smaller blocks are good
+ * for random access, but require more memory to hold the block index, and may
+ * be slower to create (because we must flush the compressor stream at the
+ * conclusion of each data block, which leads to an FS I/O flush). Further, due
+ * to the internal caching in Compression codec, the smallest possible block
+ * size would be around 20KB-30KB.
+ * <li>The current implementation does not offer true multi-threading for
+ * reading. The implementation uses FSDataInputStream seek()+read(), which is
+ * shown to be much faster than positioned-read call in single thread mode.
+ * However, it also means that if multiple threads attempt to access the same
+ * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
+ * sequentially even if they access different DFS blocks.
+ * <li>Compression codec. Use "none" if the data is not very compressable (by
+ * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
+ * as the starting point for experimenting. "gz" overs slightly better
+ * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
+ * decompress, comparing to "lzo".
+ * <li>File system buffering, if the underlying FSDataInputStream and
+ * FSDataOutputStream is already adequately buffered; or if applications
+ * reads/writes keys and values in large buffers, we can reduce the sizes of
+ * input/output buffering in TFile layer by setting the configuration parameters
+ * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
+ * </ul>
+ *
+ * Some design rationale behind TFile can be found at <a
+ * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
+ *
+ * @since 2.0.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DTFile {
+  static final Log LOG = LogFactory.getLog(DTFile.class);
+
+  private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
+  private static final String FS_INPUT_BUF_SIZE_ATTR =
+      "tfile.fs.input.buffer.size";
+  private static final String FS_OUTPUT_BUF_SIZE_ATTR =
+      "tfile.fs.output.buffer.size";
+
+  public static final int DEFAULT_INPUT_FS_BUF_SIZE = 256 * 1024;
+
+  static int getChunkBufferSize(Configuration conf) {
+    int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
+    return (ret > 0) ? ret : 1024 * 1024;
+  }
+
+  static int getFSInputBufferSize(Configuration conf) {
+    int buffserSize = conf.getInt(FS_INPUT_BUF_SIZE_ATTR, DEFAULT_INPUT_FS_BUF_SIZE);
+    if (buffserSize <= 0)
+      buffserSize = DEFAULT_INPUT_FS_BUF_SIZE;
+    return buffserSize;
+  }
+
+  static int getFSOutputBufferSize(Configuration conf) {
+    return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
+  }
+
+  private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
+  static final Version API_VERSION = new Version((short) 1, (short) 0);
+
+  /** compression: gzip */
+  public static final String COMPRESSION_GZ = "gz";
+  /** compression: lzo */
+  public static final String COMPRESSION_LZO = "lzo";
+  /** compression: none */
+  public static final String COMPRESSION_NONE = "none";
+  /** comparator: memcmp */
+  public static final String COMPARATOR_MEMCMP = "memcmp";
+  /** comparator prefix: java class */
+  public static final String COMPARATOR_JCLASS = "jclass:";
+
+  /**
+   * Make a raw comparator from a string name.
+   *
+   * @param name
+   *          Comparator name
+   * @return A RawComparable comparator.
+   */
+  static public Comparator<RawComparable> makeComparator(String name) {
+    return TFileMeta.makeComparator(name);
+  }
+
+  // Prevent the instantiation of TFiles
+  private DTFile() {
+    // nothing
+  }
+
+  /**
+   * Get names of supported compression algorithms. The names are acceptable by
+   * TFile.Writer.
+   *
+   * @return Array of strings, each represents a supported compression
+   *         algorithm. Currently, the following compression algorithms are
+   *         supported.
+   *         <ul>
+   *         <li>"none" - No compression.
+   *         <li>"lzo" - LZO compression.
+   *         <li>"gz" - GZIP compression.
+   *         </ul>
+   */
+  public static String[] getSupportedCompressionAlgorithms() {
+    return Compression.getSupportedAlgorithms();
+  }
+
+  /**
+   * TFile Writer.
+   */
+  @InterfaceStability.Evolving
+  public static class Writer implements Closeable {
+    // minimum compressed size for a block.
+    private final int sizeMinBlock;
+
+    // Meta blocks.
+    final TFileIndex tfileIndex;
+    final TFileMeta tfileMeta;
+
+    // reference to the underlying BCFile.
+    private DTBCFile.Writer writerBCF;
+
+    // current data block appender.
+    BlockAppender blkAppender;
+    long blkRecordCount;
+
+    // buffers for caching the key.
+    BoundedByteArrayOutputStream currentKeyBufferOS;
+    BoundedByteArrayOutputStream lastKeyBufferOS;
+
+    // buffer used by chunk codec
+    private byte[] valueBuffer;
+
+    /**
+     * Writer states. The state always transits in circles: READY -> IN_KEY ->
+     * END_KEY -> IN_VALUE -> READY.
+     */
+    private enum State {
+      READY, // Ready to start a new key-value pair insertion.
+      IN_KEY, // In the middle of key insertion.
+      END_KEY, // Key insertion complete, ready to insert value.
+      IN_VALUE, // In value insertion.
+      // ERROR, // Error encountered, cannot continue.
+      CLOSED, // TFile already closed.
+    };
+
+    // current state of Writer.
+    State state = State.READY;
+    Configuration conf;
+    long errorCount = 0;
+
+    /**
+     * Constructor
+     *
+     * @param fsdos
+     *          output stream for writing. Must be at position 0.
+     * @param minBlockSize
+     *          Minimum compressed block size in bytes. A compression block will
+     *          not be closed until it reaches this size except for the last
+     *          block.
+     * @param compressName
+     *          Name of the compression algorithm. Must be one of the strings
+     *          returned by {@link DTFile#getSupportedCompressionAlgorithms()}.
+     * @param comparator
+     *          Leave comparator as null or empty string if TFile is not sorted.
+     *          Otherwise, provide the string name for the comparison algorithm
+     *          for keys. Two kinds of comparators are supported.
+     *          <ul>
+     *          <li>Algorithmic comparator: binary comparators that is language
+     *          independent. Currently, only "memcmp" is supported.
+     *          <li>Language-specific comparator: binary comparators that can
+     *          only be constructed in specific language. For Java, the syntax
+     *          is "jclass:", followed by the class name of the RawComparator.
+     *          Currently, we only support RawComparators that can be
+     *          constructed through the default constructor (with no
+     *          parameters). Parameterized RawComparators such as
+     *          {@link WritableComparator} or
+     *          {@link JavaSerializationComparator} may not be directly used.
+     *          One should write a wrapper class that inherits from such classes
+     *          and use its default constructor to perform proper
+     *          initialization.
+     *          </ul>
+     * @param conf
+     *          The configuration object.
+     * @throws IOException
+     */
+    public Writer(FSDataOutputStream fsdos, int minBlockSize,
+        String compressName, String comparator, Configuration conf)
+        throws IOException {
+      sizeMinBlock = minBlockSize;
+      tfileMeta = new TFileMeta(comparator);
+      tfileIndex = new TFileIndex(tfileMeta.getComparator());
+
+      writerBCF = new DTBCFile.Writer(fsdos, compressName, conf);
+      currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
+      lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
+      this.conf = conf;
+    }
+
+    /**
+     * Close the Writer. Resources will be released regardless of the exceptions
+     * being thrown. Future close calls will have no effect.
+     *
+     * The underlying FSDataOutputStream is not closed.
+     */
+    @Override
+    public void close() throws IOException {
+      if ((state == State.CLOSED)) {
+        return;
+      }
+      try {
+        // First try the normal finish.
+        // Terminate upon the first Exception.
+        if (errorCount == 0) {
+          if (state != State.READY) {
+            throw new IllegalStateException(
+                "Cannot close TFile in the middle of key-value insertion.");
+          }
+
+          finishDataBlock(true);
+
+          // first, write out data:TFile.meta
+          BlockAppender outMeta =
+              writerBCF
+                  .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
+          try {
+            tfileMeta.write(outMeta);
+          } finally {
+            outMeta.close();
+          }
+
+          // second, write out data:TFile.index
+          BlockAppender outIndex =
+              writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
+          try {
+            tfileIndex.write(outIndex);
+          } finally {
+            outIndex.close();
+          }
+
+          writerBCF.close();
+        }
+      } finally {
+        IOUtils.cleanup(LOG, blkAppender, writerBCF);
+        blkAppender = null;
+        writerBCF = null;
+        state = State.CLOSED;
+      }
+    }
+
+    /**
+     * Adding a new key-value pair to the TFile. This is synonymous to
+     * append(key, 0, key.length, value, 0, value.length)
+     *
+     * @param key
+     *          Buffer for key.
+     * @param value
+     *          Buffer for value.
+     * @throws IOException
+     */
+    public void append(byte[] key, byte[] value) throws IOException {
+      append(key, 0, key.length, value, 0, value.length);
+    }
+
+    /**
+     * Adding a new key-value pair to TFile.
+     *
+     * @param key
+     *          buffer for key.
+     * @param koff
+     *          offset in key buffer.
+     * @param klen
+     *          length of key.
+     * @param value
+     *          buffer for value.
+     * @param voff
+     *          offset in value buffer.
+     * @param vlen
+     *          length of value.
+     * @throws IOException
+     *           Upon IO errors.
+     *           <p>
+     *           If an exception is thrown, the TFile will be in an inconsistent
+     *           state. The only legitimate call after that would be close
+     */
+    public void append(byte[] key, int koff, int klen, byte[] value, int voff,
+        int vlen) throws IOException {
+      if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
+        throw new IndexOutOfBoundsException(
+            "Bad key buffer offset-length combination.");
+      }
+
+      if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
+        throw new IndexOutOfBoundsException(
+            "Bad value buffer offset-length combination.");
+      }
+
+      try {
+        DataOutputStream dosKey = prepareAppendKey(klen);
+        try {
+          ++errorCount;
+          dosKey.write(key, koff, klen);
+          --errorCount;
+        } finally {
+          dosKey.close();
+        }
+
+        DataOutputStream dosValue = prepareAppendValue(vlen);
+        try {
+          ++errorCount;
+          dosValue.write(value, voff, vlen);
+          --errorCount;
+        } finally {
+          dosValue.close();
+        }
+      } finally {
+        state = State.READY;
+      }
+    }
+
+    /**
+     * Helper class to register key after close call on key append stream.
+     */
+    private class KeyRegister extends DataOutputStream {
+      private final int expectedLength;
+      private boolean closed = false;
+
+      public KeyRegister(int len) {
+        super(currentKeyBufferOS);
+        if (len >= 0) {
+          currentKeyBufferOS.reset(len);
+        } else {
+          currentKeyBufferOS.reset();
+        }
+        expectedLength = len;
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+
+        try {
+          ++errorCount;
+          byte[] key = currentKeyBufferOS.getBuffer();
+          int len = currentKeyBufferOS.size();
+          /**
+           * verify length.
+           */
+          if (expectedLength >= 0 && expectedLength != len) {
+            throw new IOException("Incorrect key length: expected="
+                + expectedLength + " actual=" + len);
+          }
+
+          Utils.writeVInt(blkAppender, len);
+          blkAppender.write(key, 0, len);
+          if (tfileIndex.getFirstKey() == null) {
+            tfileIndex.setFirstKey(key, 0, len);
+          }
+
+          if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
+            byte[] lastKey = lastKeyBufferOS.getBuffer();
+            int lastLen = lastKeyBufferOS.size();
+            if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
+                lastLen) < 0) {
+              throw new IOException("Keys are not added in sorted order");
+            }
+          }
+
+          BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
+          currentKeyBufferOS = lastKeyBufferOS;
+          lastKeyBufferOS = tmp;
+          --errorCount;
+        } finally {
+          closed = true;
+          state = State.END_KEY;
+        }
+      }
+    }
+
+    /**
+     * Helper class to register value after close call on value append stream.
+     */
+    private class ValueRegister extends DataOutputStream {
+      private boolean closed = false;
+
+      public ValueRegister(OutputStream os) {
+        super(os);
+      }
+
+      // Avoiding flushing call to down stream.
+      @Override
+      public void flush() {
+        // do nothing
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+
+        try {
+          ++errorCount;
+          super.close();
+          blkRecordCount++;
+          // bump up the total record count in the whole file
+          tfileMeta.incRecordCount();
+          finishDataBlock(false);
+          --errorCount;
+        } finally {
+          closed = true;
+          state = State.READY;
+        }
+      }
+    }
+
+    /**
+     * Obtain an output stream for writing a key into TFile. This may only be
+     * called when there is no active Key appending stream or value appending
+     * stream.
+     *
+     * @param length
+     *          The expected length of the key. If length of the key is not
+     *          known, set length = -1. Otherwise, the application must write
+     *          exactly as many bytes as specified here before calling close on
+     *          the returned output stream.
+     * @return The key appending output stream.
+     * @throws IOException
+     *
+     */
+    public DataOutputStream prepareAppendKey(int length) throws IOException {
+      if (state != State.READY) {
+        throw new IllegalStateException("Incorrect state to start a new key: "
+            + state.name());
+      }
+
+      initDataBlock();
+      DataOutputStream ret = new KeyRegister(length);
+      state = State.IN_KEY;
+      return ret;
+    }
+
+    /**
+     * Obtain an output stream for writing a value into TFile. This may only be
+     * called right after a key appending operation (the key append stream must
+     * be closed).
+     *
+     * @param length
+     *          The expected length of the value. If length of the value is not
+     *          known, set length = -1. Otherwise, the application must write
+     *          exactly as many bytes as specified here before calling close on
+     *          the returned output stream. Advertising the value size up-front
+     *          guarantees that the value is encoded in one chunk, and avoids
+     *          intermediate chunk buffering.
+     * @throws IOException
+     *
+     */
+    public DataOutputStream prepareAppendValue(int length) throws IOException {
+      if (state != State.END_KEY) {
+        throw new IllegalStateException(
+            "Incorrect state to start a new value: " + state.name());
+      }
+
+      DataOutputStream ret;
+
+      // unknown length
+      if (length < 0) {
+        if (valueBuffer == null) {
+          valueBuffer = new byte[getChunkBufferSize(conf)];
+        }
+        ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
+      } else {
+        ret =
+            new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
+      }
+
+      state = State.IN_VALUE;
+      return ret;
+    }
+
+    /**
+     * Obtain an output stream for creating a meta block. This function may not
+     * be called when there is a key append stream or value append stream
+     * active. No more key-value insertion is allowed after a meta data block
+     * has been added to TFile.
+     *
+     * @param name
+     *          Name of the meta block.
+     * @param compressName
+     *          Name of the compression algorithm to be used. Must be one of the
+     *          strings returned by
+     *          {@link DTFile#getSupportedCompressionAlgorithms()}.
+     * @return A DataOutputStream that can be used to write Meta Block data.
+     *         Closing the stream would signal the ending of the block.
+     * @throws IOException
+     * @throws MetaBlockAlreadyExists
+     *           the Meta Block with the same name already exists.
+     */
+    public DataOutputStream prepareMetaBlock(String name, String compressName)
+        throws IOException, MetaBlockAlreadyExists {
+      if (state != State.READY) {
+        throw new IllegalStateException(
+            "Incorrect state to start a Meta Block: " + state.name());
+      }
+
+      finishDataBlock(true);
+      DataOutputStream outputStream =
+          writerBCF.prepareMetaBlock(name, compressName);
+      return outputStream;
+    }
+
+    /**
+     * Obtain an output stream for creating a meta block. This function may not
+     * be called when there is a key append stream or value append stream
+     * active. No more key-value insertion is allowed after a meta data block
+     * has been added to TFile. Data will be compressed using the default
+     * compressor as defined in Writer's constructor.
+     *
+     * @param name
+     *          Name of the meta block.
+     * @return A DataOutputStream that can be used to write Meta Block data.
+     *         Closing the stream would signal the ending of the block.
+     * @throws IOException
+     * @throws MetaBlockAlreadyExists
+     *           the Meta Block with the same name already exists.
+     */
+    public DataOutputStream prepareMetaBlock(String name) throws IOException,
+        MetaBlockAlreadyExists {
+      if (state != State.READY) {
+        throw new IllegalStateException(
+            "Incorrect state to start a Meta Block: " + state.name());
+      }
+
+      finishDataBlock(true);
+      return writerBCF.prepareMetaBlock(name);
+    }
+
+    /**
+     * Check if we need to start a new data block.
+     *
+     * @throws IOException
+     */
+    private void initDataBlock() throws IOException {
+      // for each new block, get a new appender
+      if (blkAppender == null) {
+        blkAppender = writerBCF.prepareDataBlock();
+      }
+    }
+
+    /**
+     * Close the current data block if necessary.
+     *
+     * @param bForceFinish
+     *          Force the closure regardless of the block size.
+     * @throws IOException
+     */
+    void finishDataBlock(boolean bForceFinish) throws IOException {
+      if (blkAppender == null) {
+        return;
+      }
+
+      // exceeded the size limit, do the compression and finish the block
+      if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
+        // keep tracks of the last key of each data block, no padding
+        // for now
+        TFileIndexEntry keyLast =
+            new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
+                .size(), blkRecordCount);
+        tfileIndex.addEntry(keyLast);
+        // close the appender
+        blkAppender.close();
+        blkAppender = null;
+        blkRecordCount = 0;
+      }
+    }
+  }
+
+  /**
+   * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
+   * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
+   * ) , a portion of TFile based on byte offsets (
+   * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
+   * fall in a certain key range (for sorted TFile only,
+   * {@link Reader#createScannerByKey(byte[], byte[])} or
+   * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
+   */
+  @InterfaceStability.Evolving
+  public static class Reader implements Closeable {
+    // The underlying BCFile reader.
+    final DTBCFile.Reader readerBCF;
+
+    // TFile index, it is loaded lazily.
+    TFileIndex tfileIndex = null;
+    final TFileMeta tfileMeta;
+    final BytesComparator comparator;
+
+    // global begin and end locations.
+    private final Location begin;
+    private final Location end;
+
+    /**
+     * Location representing a virtual position in the TFile.
+     */
+    static final class Location implements Comparable<Location>, Cloneable {
+      private int blockIndex;
+      // distance/offset from the beginning of the block
+      private long recordIndex;
+
+      Location(int blockIndex, long recordIndex) {
+        set(blockIndex, recordIndex);
+      }
+
+      void incRecordIndex() {
+        ++recordIndex;
+      }
+
+      Location(Location other) {
+        set(other);
+      }
+
+      int getBlockIndex() {
+        return blockIndex;
+      }
+
+      long getRecordIndex() {
+        return recordIndex;
+      }
+
+      void set(int blockIndex, long recordIndex) {
+        if ((blockIndex | recordIndex) < 0) {
+          throw new IllegalArgumentException(
+              "Illegal parameter for BlockLocation.");
+        }
+        this.blockIndex = blockIndex;
+        this.recordIndex = recordIndex;
+      }
+
+      void set(Location other) {
+        set(other.blockIndex, other.recordIndex);
+      }
+
+      /**
+       * @see java.lang.Comparable#compareTo(java.lang.Object)
+       */
+      @Override
+      public int compareTo(Location other) {
+        return compareTo(other.blockIndex, other.recordIndex);
+      }
+
+      int compareTo(int bid, long rid) {
+        if (this.blockIndex == bid) {
+          long ret = this.recordIndex - rid;
+          if (ret > 0) return 1;
+          if (ret < 0) return -1;
+          return 0;
+        }
+        return this.blockIndex - bid;
+      }
+
+      /**
+       * @see java.lang.Object#clone()
+       */
+      @Override
+      protected Location clone() throws CloneNotSupportedException {
+        return (Location)super.clone();
+      }
+
+      /**
+       * @see java.lang.Object#hashCode()
+       */
+      @Override
+      public int hashCode() {
+        final int prime = 31;
+        int result = prime + blockIndex;
+        result = (int) (prime * result + recordIndex);
+        return result;
+      }
+
+      /**
+       * @see java.lang.Object#equals(java.lang.Object)
+       */
+      @Override
+      public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        Location other = (Location) obj;
+        if (blockIndex != other.blockIndex) return false;
+        if (recordIndex != other.recordIndex) return false;
+        return true;
+      }
+    }
+
+    /**
+     * Constructor
+     *
+     * @param fsdis
+     *          FS input stream of the TFile.
+     * @param fileLength
+     *          The length of TFile. This is required because we have no easy
+     *          way of knowing the actual size of the input file through the
+     *          File input stream.
+     * @param conf
+     * @throws IOException
+     */
+    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
+        throws IOException {
+      readerBCF = new DTBCFile.Reader(fsdis, fileLength, conf);
+
+      // first, read TFile meta
+      BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
+      try {
+        tfileMeta = new TFileMeta(brMeta);
+      } finally {
+        brMeta.close();
+      }
+
+      comparator = tfileMeta.getComparator();
+      // Set begin and end locations.
+      begin = new Location(0, 0);
+      end = new Location(readerBCF.getBlockCount(), 0);
+    }
+
+    /**
+     * Close the reader. The state of the Reader object is undefined after
+     * close. Calling close() for multiple times has no effect.
+     */
+    @Override
+    public void close() throws IOException {
+      readerBCF.close();
+    }
+
+    /**
+     * Get the begin location of the TFile.
+     *
+     * @return If TFile is not empty, the location of the first key-value pair.
+     *         Otherwise, it returns end().
+     */
+    Location begin() {
+      return begin;
+    }
+
+    /**
+     * Get the end location of the TFile.
+     *
+     * @return The location right after the last key-value pair in TFile.
+     */
+    Location end() {
+      return end;
+    }
+
+    /**
+     * Get the string representation of the comparator.
+     *
+     * @return If the TFile is not sorted by keys, an empty string will be
+     *         returned. Otherwise, the actual comparator string that is
+     *         provided during the TFile creation time will be returned.
+     */
+    public String getComparatorName() {
+      return tfileMeta.getComparatorString();
+    }
+
+    /**
+     * Is the TFile sorted?
+     *
+     * @return true if TFile is sorted.
+     */
+    public boolean isSorted() {
+      return tfileMeta.isSorted();
+    }
+
+    /**
+     * Get the number of key-value pair entries in TFile.
+     *
+     * @return the number of key-value pairs in TFile
+     */
+    public long getEntryCount() {
+      return tfileMeta.getRecordCount();
+    }
+
+    /**
+     * Lazily loading the TFile index.
+     *
+     * @throws IOException
+     */
+    synchronized void checkTFileDataIndex() throws IOException {
+      if (tfileIndex == null) {
+        BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
+        try {
+          tfileIndex =
+              new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
+                  .getComparator());
+        } finally {
+          brIndex.close();
+        }
+      }
+    }
+
+    /**
+     * Get the first key in the TFile.
+     *
+     * @return The first key in the TFile.
+     * @throws IOException
+     */
+    public RawComparable getFirstKey() throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getFirstKey();
+    }
+
+    /**
+     * Get the last key in the TFile.
+     *
+     * @return The last key in the TFile.
+     * @throws IOException
+     */
+    public RawComparable getLastKey() throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getLastKey();
+    }
+
+    /**
+     * Get a Comparator object to compare Entries. It is useful when you want
+     * stores the entries in a collection (such as PriorityQueue) and perform
+     * sorting or comparison among entries based on the keys without copying out
+     * the key.
+     *
+     * @return An Entry Comparator..
+     */
+    public Comparator<Scanner.Entry> getEntryComparator() {
+      if (!isSorted()) {
+        throw new RuntimeException(
+            "Entries are not comparable for unsorted TFiles");
+      }
+
+      return new Comparator<Scanner.Entry>() {
+        /**
+         * Provide a customized comparator for Entries. This is useful if we
+         * have a collection of Entry objects. However, if the Entry objects
+         * come from different TFiles, users must ensure that those TFiles share
+         * the same RawComparator.
+         */
+        @Override
+        public int compare(Scanner.Entry o1, Scanner.Entry o2) {
+          return comparator.compare(o1.getBlockBuffer(), o1.getKeyOffset(), o1.getKeyLength(), o2
+              .getBlockBuffer(), o2.getKeyOffset(), o2.getKeyLength());
+        }
+      };
+    }
+
+    /**
+     * Get an instance of the RawComparator that is constructed based on the
+     * string comparator representation.
+     *
+     * @return a Comparator that can compare RawComparable's.
+     */
+    public Comparator<RawComparable> getComparator() {
+      return comparator;
+    }
+
+    /**
+     * Stream access to a meta block.``
+     *
+     * @param name
+     *          The name of the meta block.
+     * @return The input stream.
+     * @throws IOException
+     *           on I/O error.
+     * @throws MetaBlockDoesNotExist
+     *           If the meta block with the name does not exist.
+     */
+    public DataInputStream getMetaBlock(String name) throws IOException,
+        MetaBlockDoesNotExist {
+      return readerBCF.getMetaBlock(name);
+    }
+
+    /**
+     * if greater is true then returns the beginning location of the block
+     * containing the key strictly greater than input key. if greater is false
+     * then returns the beginning location of the block greater than equal to
+     * the input key
+     *
+     * @param key
+     *          the input key
+     * @param greater
+     *          boolean flag
+     * @return
+     * @throws IOException
+     */
+    Location getBlockContainsKey(RawComparable key, boolean greater)
+        throws IOException {
+      if (!isSorted()) {
+        throw new RuntimeException("Seeking in unsorted TFile");
+      }
+      checkTFileDataIndex();
+      int blkIndex =
+          (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
+      if (blkIndex < 0) return end;
+      return new Location(blkIndex, 0);
+    }
+
+    Location getLocationByRecordNum(long recNum) throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getLocationByRecordNum(recNum);
+    }
+
+    long getRecordNumByLocation(Location location) throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getRecordNumByLocation(location);
+    }
+
+    int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
+      if (!isSorted()) {
+        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
+      }
+      return comparator.compare(a, o1, l1, b, o2, l2);
+    }
+
+    int compareKeys(RawComparable a, RawComparable b) {
+      if (!isSorted()) {
+        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
+      }
+      return comparator.compare(a, b);
+    }
+
+    /**
+     * Get the location pointing to the beginning of the first key-value pair in
+     * a compressed block whose byte offset in the TFile is greater than or
+     * equal to the specified offset.
+     *
+     * @param offset
+     *          the user supplied offset.
+     * @return the location to the corresponding entry; or end() if no such
+     *         entry exists.
+     */
+    Location getLocationNear(long offset) {
+      int blockIndex = readerBCF.getBlockIndexNear(offset);
+      if (blockIndex == -1) return end;
+      return new Location(blockIndex, 0);
+    }
+
+    /**
+     * Get the RecordNum for the first key-value pair in a compressed block
+     * whose byte offset in the TFile is greater than or equal to the specified
+     * offset.
+     *
+     * @param offset
+     *          the user supplied offset.
+     * @return the RecordNum to the corresponding entry. If no such entry
+     *         exists, it returns the total entry count.
+     * @throws IOException
+     */
+    public long getRecordNumNear(long offset) throws IOException {
+      return getRecordNumByLocation(getLocationNear(offset));
+    }
+
+    /**
+     * Get a sample key that is within a block whose starting offset is greater
+     * than or equal to the specified offset.
+     *
+     * @param offset
+     *          The file offset.
+     * @return the key that fits the requirement; or null if no such key exists
+     *         (which could happen if the offset is close to the end of the
+     *         TFile).
+     * @throws IOException
+     */
+    public RawComparable getKeyNear(long offset) throws IOException {
+      int blockIndex = readerBCF.getBlockIndexNear(offset);
+      if (blockIndex == -1) return null;
+      checkTFileDataIndex();
+      return new ByteArray(tfileIndex.getEntry(blockIndex).key);
+    }
+
+    /**
+     * Get a scanner than can scan the whole TFile.
+     *
+     * @return The scanner object. A valid Scanner is always returned even if
+     *         the TFile is empty.
+     * @throws IOException
+     */
+    public Scanner createScanner() throws IOException {
+      return new Scanner(this, begin, end);
+    }
+
+    /**
+     * Get a scanner that covers a portion of TFile based on byte offsets.
+     *
+     * @param offset
+     *          The beginning byte offset in the TFile.
+     * @param length
+     *          The length of the region.
+     * @return The actual coverage of the returned scanner tries to match the
+     *         specified byte-region but always round up to the compression
+     *         block boundaries. It is possible that the returned scanner
+     *         contains zero key-value pairs even if length is positive.
+     * @throws IOException
+     */
+    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
+      return new Scanner(this, offset, offset + length);
+    }
+
+    /**
+     * Get a scanner that covers a portion of TFile based on keys.
+     *
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     *
+     * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
+     */
+    @Deprecated
+    public Scanner createScanner(byte[] beginKey, byte[] endKey)
+      throws IOException {
+      return createScannerByKey(beginKey, endKey);
+    }
+
+    /**
+     * Get a scanner that covers a portion of TFile based on keys.
+     *
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     */
+    public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
+        throws IOException {
+      return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
+          0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
+          0, endKey.length));
+    }
+
+    /**
+     * Get a scanner that covers a specific key range.
+     *
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     *
+     * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
+     *             instead.
+     */
+    @Deprecated
+    public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
+        throws IOException {
+      return createScannerByKey(beginKey, endKey);
+    }
+
+    /**
+     * Get a scanner that covers a specific key range.
+     *
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     */
+    public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
+        throws IOException {
+      if ((beginKey != null) && (endKey != null)
+          && (compareKeys(beginKey, endKey) >= 0)) {
+        return new Scanner(this, beginKey, beginKey);
+      }
+      return new Scanner(this, beginKey, endKey);
+    }
+
+    /**
+     * Create a scanner that covers a range of records.
+     *
+     * @param beginRecNum
+     *          The RecordNum for the first record (inclusive).
+     * @param endRecNum
+     *          The RecordNum for the last record (exclusive). To scan the whole
+     *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
+     * @return The TFile scanner that covers the specified range of records.
+     * @throws IOException
+     */
+    public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
+        throws IOException {
+      if (beginRecNum < 0) beginRecNum = 0;
+      if (endRecNum < 0 || endRecNum > getEntryCount()) {
+        endRecNum = getEntryCount();
+      }
+      return new Scanner(this, getLocationByRecordNum(beginRecNum),
+          getLocationByRecordNum(endRecNum));
+    }
+
+    /**
+     * The TFile Scanner. The Scanner has an implicit cursor, which, upon
+     * creation, points to the first key-value pair in the scan range. If the
+     * scan range is empty, the cursor will point to the end of the scan range.
+     * <p>
+     * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
+     * location of the scanner.
+     * <p>
+     * Use {@link Scanner#advance()} to move the cursor to the next key-value
+     * pair (or end if none exists). Use seekTo methods (
+     * {@link Scanner#seekTo(byte[])} or
+     * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
+     * location in the covered range (including backward seeking). Use
+     * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
+     * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
+     * <p>
+     * Actual keys and values may be obtained through {@link Scanner.Entry}
+     * object, which is obtained through {@link Scanner#entry()}.
+     */
+    public static class Scanner implements Closeable {
+      // The underlying TFile reader.
+      final Reader reader;
+      // current block (null if reaching end)
+      private BlockReader blkReader;
+      private byte[] blockBuffer;
+
+      Location beginLocation;
+      Location endLocation;
+      Location currentLocation;
+
+      // flag to ensure value is only examined once.
+      boolean valueChecked = false;
+      // reusable buffer for keys.
+//      final byte[] keyBuffer;
+      // length of key, -1 means key is invalid.
+      int klen = -1;
+      int keyOffset = 0;
+
+      static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
+      BytesWritable valTransferBuffer;
+
+//      DataInputBuffer keyDataInputStream;
+      ChunkDecoder valueBufferInputStream;
+      DataInputStream valueDataInputStream;
+      // vlen == -1 if unknown.
+      int vlen;
+      int valueOffset = 0;
+
+      /**
+       * Constructor
+       *
+       * @param reader
+       *          The TFile reader object.
+       * @param offBegin
+       *          Begin byte-offset of the scan.
+       * @param offEnd
+       *          End byte-offset of the scan.
+       * @throws IOException
+       *
+       *           The offsets will be rounded to the beginning of a compressed
+       *           block whose offset is greater than or equal to the specified
+       *           offset.
+       */
+      protected Scanner(Reader reader, long offBegin, long offEnd)
+          throws IOException {
+        this(reader, reader.getLocationNear(offBegin), reader
+            .getLocationNear(offEnd));
+      }
+
+      /**
+       * Constructor
+       *
+       * @param reader
+       *          The TFile reader object.
+       * @param begin
+       *          Begin location of the scan.
+       * @param end
+       *          End location of the scan.
+       * @throws IOException
+       */
+      Scanner(Reader reader, Location begin, Location end) throws IOException {
+        this.reader = reader;
+        // ensure the TFile index is loaded throughout the life of scanner.
+        reader.checkTFileDataIndex();
+        beginLocation = begin;
+        endLocation = end;
+
+        valTransferBuffer = new BytesWritable();
+        // TODO: remember the longest key in a TFile, and use it to replace
+        // MAX_KEY_SIZE.
+//        keyBuffer = new byte[MAX_KEY_SIZE];
+//        keyDataInputStream = new DataInputBuffer();
+        valueBufferInputStream = new ChunkDecoder();
+        valueDataInputStream = new DataInputStream(valueBufferInputStream);
+
+        if (beginLocation.compareTo(endLocation) >= 0) {
+          currentLocation = new Location(endLocation);
+        } else {
+          currentLocation = new Location(0, 0);
+          initBlock(beginLocation.getBlockIndex());
+          inBlockAdvance(beginLocation.getRecordIndex());
+        }
+      }
+
+      /**
+       * Constructor
+       *
+       * @param reader
+       *          The TFile reader object.
+       * @param beginKey
+       *          Begin key of the scan. If null, scan from the first <K,V>
+       *          entry of the TFile.
+       * @param endKey
+       *          End key of the scan. If null, scan up to the last <K, V> entry
+       *          of the TFile.
+       * @throws IOException
+       */
+      protected Scanner(Reader reader, RawComparable beginKey,
+          RawComparable endKey) throws IOException {
+        this(reader, (beginKey == null) ? reader.begin() : reader
+            .getBlockContainsKey(beginKey, false), reader.end());
+        if (beginKey != null) {
+          inBlockAdvance(beginKey, false);
+          beginLocation.set(currentLocation);
+        }
+        if (endKey != null) {
+          seekTo(endKey, false);
+          endLocation.set(currentLocation);
+          seekTo(beginLocation);
+        }
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is greater than or equal
+       * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
+       * returned by the previous entry() call will be invalid.
+       *
+       * @param key
+       *          The input key
+       * @return true if we find an equal key.
+       * @throws IOException
+       */
+      public boolean seekTo(byte[] key) throws IOException {
+        return seekTo(key, 0, key.length);
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is greater than or equal
+       * to the input key. The entry returned by the previous entry() call will
+       * be invalid.
+       *
+       * @param key
+       *          The input key
+       * @param keyOffset
+       *          offset in the key buffer.
+       * @param keyLen
+       *          key buffer length.
+       * @return true if we find an equal key; false otherwise.
+       * @throws IOException
+       */
+      public boolean seekTo(byte[] key, int keyOffset, int keyLen)
+          throws IOException {
+        return seekTo(new ByteArray(key, keyOffset, keyLen), false);
+      }
+
+      private boolean seekTo(RawComparable key, boolean beyond)
+          throws IOException {
+        Location l = reader.getBlockContainsKey(key, beyond);
+        if (l.compareTo(beginLocation) < 0) {
+          l = beginLocation;
+        } else if (l.compareTo(endLocation) >= 0) {
+          seekTo(endLocation);
+          return false;
+        }
+
+        // check if what we are seeking is in the later part of the current
+        // block.
+        if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
+            || (compareCursorKeyTo(key) >= 0)) {
+          // sorry, we must seek to a different location first.
+          seekTo(l);
+        }
+
+        return inBlockAdvance(key, beyond);
+      }
+
+      /**
+       * Move the cursor to the new location. The entry returned by the previous
+       * entry() call will be invalid.
+       *
+       * @param l
+       *          new cursor location. It must fall between the begin and end
+       *          location of the scanner.
+       * @throws IOException
+       */
+      private void seekTo(Location l) throws IOException {
+        if (l.compareTo(beginLocation) < 0) {
+          throw new IllegalArgumentException(
+              "Attempt to seek before the begin location.");
+        }
+
+        if (l.compareTo(endLocation) > 0) {
+          throw new IllegalArgumentException(
+              "Attempt to seek after the end location.");
+        }
+
+        if (l.compareTo(endLocation) == 0) {
+          parkCursorAtEnd();
+          return;
+        }
+
+        if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
+          // going to a totally different block
+          initBlock(l.getBlockIndex());
+        } else {
+          if (valueChecked) {
+            // may temporarily go beyond the last record in the block (in which
+            // case the next if loop will always be true).
+            inBlockAdvance(1);
+          }
+          if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
+            initBlock(l.getBlockIndex());
+          }
+        }
+
+        inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
+
+        return;
+      }
+
+      /**
+       * Rewind to the first entry in the scanner. The entry returned by the
+       * previous entry() call will be invalid.
+       *
+       * @throws IOException
+       */
+      public void rewind() throws IOException {
+        seekTo(beginLocation);
+      }
+
+      /**
+       * Seek to the end of the scanner. The entry returned by the previous
+       * entry() call will be invalid.
+       *
+       * @throws IOException
+       */
+      public void seekToEnd() throws IOException {
+        parkCursorAtEnd();
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is greater than or equal
+       * to the input key. Synonymous to lowerBound(key, 0, key.length). The
+       * entry returned by the previous entry() call will be invalid.
+       *
+       * @param key
+       *          The input key
+       * @throws IOException
+       */
+      public void lowerBound(byte[] key) throws IOException {
+        lowerBound(key, 0, key.length);
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is greater than or equal
+       * to the input key. The entry returned by the previous entry() call will
+       * be invalid.
+       *
+       * @param key
+       *          The input key
+       * @param keyOffset
+       *          offset in the key buffer.
+       * @param keyLen
+       *          key buffer length.
+       * @throws IOException
+       */
+      public void lowerBound(byte[] key, int keyOffset, int keyLen)
+          throws IOException {
+        seekTo(new ByteArray(key, keyOffset, keyLen), false);
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is strictly greater than
+       * the input key. Synonymous to upperBound(key, 0, key.length). The entry
+       * returned by the previous entry() call will be invalid.
+       *
+       * @param key
+       *          The input key
+       * @throws IOException
+       */
+      public void upperBound(byte[] key) throws IOException {
+        upperBound(key, 0, key.length);
+      }
+
+      /**
+       * Move the cursor to the first entry whose key is strictly greater than
+       * the input key. The entry returned by the previous entry() call will be
+       * invalid.
+       *
+       * @param key
+       *          The input key
+       * @param keyOffset
+       *          offset in the key buffer.
+       * @param keyLen
+       *          key buffer length.
+       * @throws IOException
+       */
+      public void upperBound(byte[] key, int keyOffset, int keyLen)
+          throws IOException {
+        seekTo(new ByteArray(key, keyOffset, keyLen), true);
+      }
+
+      /**
+       * Move the cursor to the next key-value pair. The entry returned by the
+       * previous entry() call will be invalid.
+       *
+       * @return true if the cursor successfully moves. False when cursor is
+       *         already at the end location and cannot be advanced.
+       * @throws IOException
+       */
+      public boolean advance() throws IOException {
+        if (atEnd()) {
+          return false;
+        }
+
+        int curBid = currentLocation.getBlockIndex();
+        long curRid = currentLocation.getRecordIndex();
+        long entriesInBlock = reader.getBlockEntryCount(curBid);
+        if (curRid + 1 >= entriesInBlock) {
+          if (endLocation.compareTo(curBid + 1, 0) <= 0) {
+            // last entry in TFile.
+            parkCursorAtEnd();
+          } else {
+            // last entry in Block.
+            initBlock(curBid + 1);
+          }
+        } else {
+          inBlockAdvance(1);
+        }
+        return true;
+      }
+
+      /**
+       * Load a compressed block for reading. Expecting blockIndex is valid.
+       *
+       * @throws IOException
+       */
+      private void initBlock(int blockIndex) throws IOException {
+        klen = -1;
+        if (blkReader != null) {
+          try {
+            blkReader.close();
+          } finally {
+            blkReader = null;
+          }
+        }
+        blkReader = reader.getBlockReader(blockIndex);
+        blockBuffer = blkReader.getBlockDataInputStream().getBuf();
+        currentLocation.set(blockIndex, 0);
+      }
+
+      private void parkCursorAtEnd() throws IOException {
+        klen = -1;
+        currentLocation.set(endLocation);
+        if (blkReader != null) {
+          try {
+            blkReader.close();
+          } finally {
+            blkReader = null;
+          }
+        }
+      }
+
+      /**
+       * Close the scanner. Release all resources. The behavior of using the
+       * scanner after calling close is not defined. The entry returned by the
+       * previous entry() call will be invalid.
+       */
+      @Override
+      public void close() throws IOException {
+        parkCursorAtEnd();
+      }
+
+      /**
+       * Is cursor at the end location?
+       *
+       * @return true if the cursor is at the end location.
+       */
+      public boolean atEnd() {
+        return (currentLocation.compareTo(endLocation) >= 0);
+      }
+
+      /**
+       * check whether we have already successfully obtained the key. It also
+       * initializes the valueInputStream.
+       */
+      void checkKey() throws IOException {
+        if (klen >= 0) return;
+        if (atEnd()) {
+          throw new EOFException("No key-value to read");
+        }
+        klen = -1;
+        vlen = -1;
+        valueChecked = false;
+
+        klen = Utils.readVInt(blkReader);
+        keyOffset = blkReader.getBlockDataInputStream().getPos();
+        blkReader.getBlockDataInputStream().skip(klen);
+        valueBufferInputStream.reset(blkReader);
+        if (valueBufferInputStream.isLastChunk()) {
+          vlen = valueBufferInputStream.getRemain();
+          valueOffset = blkReader.getBlockDataInputStream().getPos();
+        }
+      }
+
+      /**
+       * Get an entry to access the key and value.
+       *
+       * @return The Entry object to access the key and value.
+       * @throws IOException
+       */
+      public Entry entry() throws IOException {
+        checkKey();
+        return new Entry();
+      }
+
+      /**
+       * Get the RecordNum corresponding to the entry pointed by the cursor.
+       * @return The RecordNum corresponding to the entry pointed by the cursor.
+       * @throws IOException
+       */
+      public long getRecordNum() throws IOException {
+        return reader.getRecordNumByLocation(currentLocation);
+      }
+
+      /**
+       * Internal API. Comparing the key at cursor to user-specified key.
+       *
+       * @param other
+       *          user-specified key.
+       * @return negative if key at cursor is smaller than user key; 0 if equal;
+       *         and positive if key at cursor greater than user key.
+       * @throws IOException
+       */
+      int compareCursorKeyTo(RawComparable other) throws IOException {
+        checkKey();
+        return reader.compareKeys(blockBuffer, keyOffset, klen, other.buffer(), other
+            .offset(), other.size());
+      }
+
+      /**
+       * Entry to a &lt;Key, Value&gt; pair.
+       */
+      public class Entry implements Comparable<RawComparable> {
+        /**
+         * Get the length of the key.
+         *
+         * @return the length of the key.
+         */
+        public int getKeyLength() {
+          return klen;
+        }
+
+        public int getKeyOffset() {
+          return keyOffset;
+        }
+
+        public int getValueOffset() {
+          return valueOffset;
+        }
+
+        public byte[] getBlockBuffer() {
+          return blockBuffer;
+        }
+
+        /**
+         * Copy the key and value in one shot into BytesWritables. This is
+         * equivalent to getKey(key); getValue(value);
+         *
+         * @param key
+         *          BytesWritable to hold key.
+         * @param value
+         *          BytesWritable to hold value
+         * @throws IOException
+         */
+        public void get(BytesWritable key, BytesWritable value)
+            throws IOException {
+          getKey(key);
+          getValue(value);
+        }
+
+        /**
+         * Copy the key into BytesWritable. The input BytesWritable will be
+         * automatically resized to the actual key size.
+         *
+         * @param key
+         *          BytesWritable to hold the key.
+         * @throws IOException
+         */
+        public int getKey(BytesWritable key) throws IOException {
+          key.setSize(getKeyLength());
+          getKey(key.getBytes());
+          return key.getLength();
+        }
+
+        /**
+         * Copy the value into BytesWritable. The input BytesWritable will be
+         * automatically resized to the actual value size. The implementation
+         * directly uses the buffer inside BytesWritable for storing the value.
+         * The call does not require the value length to be known.
+         *
+         * @param value
+         * @throws IOException
+         */
+        public long getValue(BytesWritable value) throws IOException {
+          DataInputStream dis = getValueStream();
+          int size = 0;
+          try {
+            int remain;
+            while ((remain = valueBufferInputStream.getRemain()) > 0) {
+              value.setSize(size + remain);
+              dis.readFully(value.getBytes(), size, remain);
+              size += remain;
+            }
+            return value.getLength();
+          } finally {
+            dis.close();
+          }
+        }
+
+        /**
+         * Writing the key to the output stream. This method avoids copying key
+         * buffer from Scanner into user buffer, then writing to the output
+         * stream.
+         *
+         * @param out
+         *          The output stream
+         * @return the length of the key.
+         * @throws IOException
+         */
+        public int writeKey(OutputStream out) throws IOException {
+          out.write(blockBuffer, keyOffset, klen);
+          return klen;
+        }
+
+        /**
+         * Writing the value to the output stream. This method avoids copying
+         * value data from Scanner into user buffer, then writing to the output
+         * stream. It does not require the value length to be known.
+         *
+         * @param out
+         *          The output stream
+         * @return the length of the value
+         * @throws IOException
+         */
+        public long writeValue(OutputStream out) throws IOException {
+          DataInputStream dis = getValueStream();
+          long size = 0;
+          try {
+            int chunkSize;
+            while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
+              chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
+              valTransferBuffer.setSize(chunkSize);
+              dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
+              out.write(valTransferBuffer.getBytes(), 0, chunkSize);
+              size += chunkSize;
+            }
+            return size;
+          } finally {
+            dis.close();
+          }
+        }
+
+        /**
+         * Copy the key into user supplied buffer.
+         *
+         * @param buf
+         *          The buffer supplied by user. The length of the buffer must
+         *          not be shorter than the key length.
+         * @return The length of the key.
+         *
+         * @throws IOException
+         */
+        public int getKey(byte[] buf) throws IOException {
+          return getKey(buf, 0);
+        }
+
+        /**
+         * Copy the key into user supplied buffer.
+         *
+         * @param buf
+         *          The buffer supplied by user.
+         * @param offset
+         *          The starting offset of the user buffer where we should copy
+         *          the key into. Requiring the key-length + offset no greater
+         *          than the buffer length.
+         * @return The length of the key.
+         * @throws IOException
+         */
+        public int getKey(byte[] buf, int offset) throws IOException {
+          if ((offset | (buf.length - offset - klen)) < 0) {
+            throw new IndexOutOfBoundsException(
+                "Bufer not enough to store the key");
+          }
+          System.arraycopy(blockBuffer, keyOffset, buf, offset, klen);
+          return klen;
+        }
+
+        /**
+         * Streaming access to the key. Useful for desrializing the key into
+         * user objects.
+         *
+         * @return The input stream.
+         */
+//        public DataInputStream getKeyStream() {
+//          keyDataInputStream.reset(keyBuffer, klen);
+//          return keyDataInputStream;
+//        }
+
+        /**
+         * Get the length of the value. isValueLengthKnown() must be tested
+         * true.
+         *
+         * @return the length of the value.
+         */
+        public int getValueLength() {
+          if (vlen >= 0) {
+            return vlen;
+          }
+
+          throw new RuntimeException("Value length unknown.");
+        }
+
+        /**
+         * Copy value into user-supplied buffer. User supplied buffer must be
+         * large enough to hold the whole value. The value part of the key-value
+         * pair pointed by the current cursor is not cached and can only be
+         * examined once. Calling any of the following functions more than once
+         * without moving the cursor will result in exception:
+         * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
+         * {@link #getValueStream}.
+         *
+         * @return the length of the value. Does not require
+         *         isValueLengthKnown() to be true.
+         * @throws IOException
+         *
+         */
+        public int getValue(byte[] buf) throws IOException {
+          return getValue(buf, 0);
+        }
+
+        /**
+         * Copy value into user-supplied buffer. User supplied buffer must be
+         * large enough to hold the whole value (starting from the offset). The
+         * value part of the key-value pair pointed by the current cursor is not
+         * cached and can only be examined once. Calling any of the following
+         * functions more than once without moving the cursor will result in
+         * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
+         * {@link #getValueStream}.
+         *
+         * @return the length of the value. Does not require
+         *         isValueLengthKnown() to be true.
+         * @throws IOException
+         */
+        public int getValue(byte[] buf, int offset) throws IOException {
+          DataInputStream dis = getValueStream();
+          try {
+            if (isValueLengthKnown()) {
+              if ((offset | (buf.length - offset - vlen)) < 0) {
+                throw new IndexOutOfBoundsException(
+                    "Buffer too small to hold value");
+              }
+              dis.readFully(buf, offset, vlen);
+              return vlen;
+            }
+
+            int nextOffset = offset;
+            while (nextOffset < buf.length) {
+              int n = dis.read(buf, nextOffset, buf.length - nextOffset);
+              if (n < 0) {
+                break;
+              }
+              nextOffset += n;
+            }
+            if (dis.read() >= 0) {
+              // attempt to read one more byte to determine whether we reached
+              // the
+              // end or not.
+              throw new IndexOutOfBoundsException(
+                  "Buffer too small to hold value");
+            }
+            return nextOffset - offset;
+          } finally {
+            dis.close();
+          }
+        }
+
+        /**
+         * Stream access to value. The value part of the key-value pair pointed
+         * by the current cursor is not cached and can only be examined once.
+         * Calling any of the following functions more than once without moving
+         * the cursor will result in exception: {@link #getValue(byte[])},
+         * {@link #getValue(byte[], int)}, {@link #getValueStream}.
+         *
+         * @return The input stream for reading the value.
+         * @throws IOException
+         */
+        public DataInputStream getValueStream() throws IOException {
+          if (valueChecked == true) {
+            throw new IllegalStateException(
+                "Attempt to examine value multiple times.");
+          }
+          valueChecked = true;
+          return valueDataInputStream;
+        }
+
+        /**
+         * Check whether it is safe to call getValueLength().
+         *
+         * @return true if value length is known before hand. Values less than
+         *         the chunk size will always have their lengths known before
+         *         hand. Values that are written out as a whole (with advertised
+         *         length up-front) will always have their lengths known in
+         *         read.
+         */
+        public boolean isValueLengthKnown() {
+          return (vlen >= 0);
+        }
+
+        /**
+         * Compare the entry key to another key. Synonymous to compareTo(key, 0,
+         * key.length).
+         *
+         * @param buf
+         *          The key buffer.
+         * @return comparison result between the entry key with the input key.
+         */
+        public int compareTo(byte[] buf) {
+          return compareTo(buf, 0, buf.length);
+        }
+
+        /**
+         * Compare the entry key to another key. Synonymous to compareTo(new
+         * ByteArray(buf, offset, length)
+         *
+         * @param buf
+         *          The key buffer
+         * @param offset
+         *          offset into the key buffer.
+         * @param length
+         *          the length of the key.
+         * @return comparison result between the entry key with the input key.
+         */
+        public int compareTo(byte[] buf, int offset, int length) {
+          return compareTo(new ByteArray(buf, offset, length));
+        }
+
+        /**
+         * Compare an entry with a RawComparable object. This is useful when
+         * Entries are stored in a collection, and we want to compare a user
+         * supplied key.
+         */
+        @Override
+        public int compareTo(RawComparable key) {
+          return reader.compareKeys(blockBuffer, getKeyOffset(), getKeyLength(), key.buffer(),
+              key.offset(), key.size());
+        }
+
+        /**
+         * Compare whether this and other points to the same key value.
+         */
+        @Override
+        public boolean equals(Object other) {
+          if (this == other) return true;
+          if (!(other instanceof Entry)) return false;
+          return ((Entry) other).compareTo(blockBuffer, getKeyOffset(), getKeyLength()) == 0;
+        }
+
+        @Override
+        public int hashCode() {
+          return WritableComparator.hashBytes(blockBuffer, getKeyOffset(), getKeyLength());
+        }
+      }
+
+      /**
+       * Advance cursor by n positions within the block.
+       *
+       * @param n
+       *          Number of key-value pairs to skip in block.
+       * @throws IOException
+       */
+      private void inBlockAdvance(long n) throws IOException {
+        for (long i = 0; i < n; ++i) {
+          checkKey();
+          if (!valueBufferInputStream.isClosed()) {
+            valueBufferInputStream.close();
+          }
+          klen = -1;
+          currentLocation.incRecordIndex();
+        }
+      }
+
+      /**
+       * Advance cursor in block until we find a key that is greater than or
+       * equal to the input key.
+       *
+       * @param key
+       *          Key to compare.
+       * @param greater
+       *          advance until we find a key greater than the input key.
+       * @return true if we find a equal key.
+       * @throws IOException
+       */
+      private boolean inBlockAdvance(RawComparable key, boolean greater)
+          throws IOException {
+        int curBid = currentLocation.getBlockIndex();
+        long entryInBlock = reader.getBlockEntryCount(curBid);
+        if (curBid == endLocation.getBlockIndex()) {
+          entryInBlock = endLocation.getRecordIndex();
+        }
+
+        while (currentLocation.getRecordIndex() < entryInBlock) {
+          int cmp = compareCursorKeyTo(key);
+          if (cmp > 0) return false;
+          if (cmp == 0 && !greater) return true;
+          if (!valueBufferInputStream.isClosed()) {
+            valueBufferInputStream.close();
+          }
+          klen = -1;
+          currentLocation.incRecordIndex();
+        }
+
+        throw new RuntimeException("Cannot find matching key in block.");
+      }
+    }
+
+    long getBlockEntryCount(int curBid) {
+      return tfileIndex.getEntry(curBid).entries();
+    }
+
+    BlockReader getBlockReader(int blockIndex) throws IOException {
+      return readerBCF.getDataBlock(blockIndex);
+    }
+  }
+
+  /**
+   * Data structure representing "TFile.meta" meta block.
+   */
+  static final class TFileMeta {
+    final static String BLOCK_NAME = "TFile.meta";
+    final Version version;
+    private long recordCount;
+    private final String strComparator;
+    private final BytesComparator comparator;
+
+    // ctor for writes
+    public TFileMeta(String comparator) {
+      // set fileVersion to API version when we create it.
+      version = DTFile.API_VERSION;
+      recordCount = 0;
+      strComparator = (comparator == null) ? "" : comparator;
+      this.comparator = makeComparator(strComparator);
+    }
+
+    // ctor for reads
+    public TFileMeta(DataInput in) throws IOException {
+      version = new Version(in);
+      if (!version.compatibleWith(DTFile.API_VERSION)) {
+        throw new RuntimeException("Incompatible TFile fileVersion.");
+      }
+      recordCount = Utils.readVLong(in);
+      strComparator = Utils.readString(in);
+      comparator = makeComparator(strComparator);
+    }
+
+    @SuppressWarnings("unchecked")
+    static BytesComparator makeComparator(String comparator) {
+      if (comparator.length() == 0) {
+        // unsorted keys
+        return null;
+      }
+      if (comparator.equals(COMPARATOR_MEMCMP)) {
+        // default comparator
+        return new BytesComparator(new MemcmpRawComparator());
+      } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
+        String compClassName =
+            comparator.substring(COMPARATOR_JCLASS.length()).trim();
+        try {
+          Class compClass = Class.forName(compClassName);
+          // use its default ctor to create an instance
+          return new BytesComparator((RawComparator<Object>) compClass
+              .newInstance());
+        } catch (Exception e) {
+          throw new IllegalArgumentException(
+              "Failed to instantiate comparator: " + comparator + "("
+                  + e.toString() + ")");
+        }
+      } else {
+        throw new IllegalArgumentException("Unsupported comparator: "
+            + comparator);
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      DTFile.API_VERSION.write(out);
+      Utils.writeVLong(out, recordCount);
+      Utils.writeString(out, strComparator);
+    }
+
+    public long getRecordCount() {
+      return recordCount;
+    }
+
+    public void incRecordCount() {
+      ++recordCount;
+    }
+
+    public boolean isSorted() {
+      return !strComparator.isEmpty();
+    }
+
+    public String getComparatorString() {
+      return strComparator;
+    }
+
+    public BytesComparator getComparator() {
+      return comparator;
+    }
+
+    public Version getVersion() {
+      return version;
+    }
+  } // END: class MetaTFileMeta
+
+  /**
+   * Data structure representing "TFile.index" meta block.
+   */
+  static class TFileIndex {
+    final static String BLOCK_NAME = "TFile.index";
+    private ByteArray firstKey;
+    private final ArrayList<TFileIndexEntry> index;
+    private final ArrayList<Long> recordNumIndex;
+    private final BytesComparator comparator;
+    private long sum = 0;
+
+    /**
+     * For reading from file.
+     *
+     * @throws IOException
+     */
+    public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
+        throws IOException {
+      index = new ArrayList<TFileIndexEntry>(entryCount);
+      recordNumIndex = new ArrayList<Long>(entryCount);
+      int size = Utils.readVInt(in); // size for the first key entry.
+      if (size > 0) {
+        byte[] buffer = new byte[size];
+        in.readFully(buffer);
+        DataInputStream firstKeyInputStream =
+            new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
+
+        int firstKeyLength = Utils.readVInt(firstKeyInputStream);
+        firstKey = new ByteArray(new byte[firstKeyLength]);
+        firstKeyInputStream.readFully(firstKey.buffer());
+
+        for (int i = 0; i < entryCount; i++) {
+          size = Utils.readVInt(in);
+          if (buffer.length < size) {
+            buffer = new byte[size];
+          }
+          in.readFully(buffer, 0, size);
+          TFileIndexEntry idx =
+              new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
+                  buffer, 0, size)));
+          index.add(idx);
+          sum += idx.entries();
+          recordNumIndex.add(sum);
+        }
+      } else {
+        if (entryCount != 0) {
+          throw new RuntimeException("Internal error");
+        }
+      }
+      this.comparator = comparator;
+    }
+
+    /**
+     * @param key
+     *          input key.
+     * @return the ID of the first block that contains key >= input key. Or -1
+     *         if no such block exists.
+     */
+    public int lowerBound(RawComparable key) {
+      if (comparator == null) {
+        throw new RuntimeException("Cannot search in unsorted TFile");
+      }
+
+      if (firstKey == null) {
+        return -1; // not found
+      }
+
+      int ret = Utils.lowerBound(index, key, comparator);
+      if (ret == index.size()) {
+        return -1;
+      }
+      return ret;
+    }
+
+    /**
+     * @param key
+     *          input key.
+     * @return the ID of the first block that contains key > input key. Or -1
+     *         if no such block exists.
+     */
+    public int upperBound(RawComparable key) {
+      if (comparator == null) {
+        throw new RuntimeException("Cannot search in unsorted TFile");
+      }
+
+      if (firstKey == null) {
+        return -1; // not found
+      }
+
+      int ret = Utils.upperBound(index, key, comparator);
+      if (ret == index.size()) {
+        return -1;
+      }
+      return ret;
+    }
+
+    /**
+     * For writing to file.
+     */
+    public TFileIndex(BytesComparator comparator) {
+      index = new ArrayList<TFileIndexEntry>();
+      recordNumIndex = new ArrayList<Long>();
+      this.comparator = comparator;
+    }
+
+    public RawComparable getFirstKey() {
+      return firstKey;
+    }
+
+    public Reader.Location getLocationByRecordNum(long recNum) {
+      int idx = Utils.upperBound(recordNumIndex, recNum);
+      long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
+      return new Reader.Location(idx, recNum-lastRecNum);
+    }
+
+    public long getRecordNumByLocation(Reader.Location location) {
+      int blkIndex = location.getBlockIndex();
+      long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
+      return lastRecNum + location.getRecordIndex();
+    }
+
+    public void setFirstKey(byte[] key, int offset, int length) {
+      firstKey = new ByteArray(new byte[length]);
+      System.arraycopy(key, offset, firstKey.buffer(), 0, length);
+    }
+
+    public RawComparable getLastKey() {
+      if (index.size() == 0) {
+        return null;
+      }
+      return new ByteArray(index.get(index.size() - 1).buffer());
+    }
+
+    public void addEntry(TFileIndexEntry keyEntry) {
+      index.add(keyEntry);
+      sum += keyEntry.entries();
+      recordNumIndex.add(sum);
+    }
+
+    public TFileIndexEntry getEntry(int bid) {
+      return index.get(bid);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      if (firstKey == null) {
+        Utils.writeVInt(out, 0);
+        return;
+      }
+
+      DataOutputBuffer dob = new DataOutputBuffer();
+      Utils.writeVInt(dob, firstKey.size());
+      dob.write(firstKey.buffer());
+      Utils.writeVInt(out, dob.size());
+      out.write(dob.getData(), 0, dob.getLength());
+
+      for (TFileIndexEntry entry : index) {
+        dob.reset();
+        entry.write(dob);
+        Utils.writeVInt(out, dob.getLength());
+        out.write(dob.getData(), 0, dob.getLength());
+      }
+    }
+  }
+
+  /**
+   * TFile Data Index entry. We should try to make the memory footprint of each
+   * index entry as small as possible.
+   */
+  static final class TFileIndexEntry implements RawComparable {
+    final byte[] key;
+    // count of <key, value> entries in the block.
+    final long kvEntries;
+
+    public TFileIndexEntry(DataInput in) throws IOException {
+      int len = Utils.readVInt(in);
+      key = new byte[len];
+      in.readFully(key, 0, len);
+      kvEntries = Utils.readVLong(in);
+    }
+
+    // default entry, without any padding
+    public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
+      key = new byte[len];
+      System.arraycopy(newkey, offset, key, 0, len);
+      this.kvEntries = entries;
+    }
+
+    @Override
+    public byte[] buffer() {
+      return key;
+    }
+
+    @Override
+    public int offset() {
+      return 0;
+    }
+
+    @Override
+    public int size() {
+      return key.length;
+    }
+
+    long entries() {
+      return kvEntries;
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Utils.writeVInt(out, key.length);
+      out.write(key, 0, key.length);
+      Utils.writeVLong(out, kvEntries);
+    }
+  }
+
+  /**
+   * Dumping the TFile information.
+   *
+   * @param args
+   *          A list of TFile paths.
+   */
+  public static void main(String[] args) {
+    System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", DTFile.API_VERSION
+        .toString(), DTBCFile.API_VERSION.toString());
+    if (args.length == 0) {
+      System.out
+          .println("Usage: java ... com.datatorrent.contrib.hdht.tfile.withcache.TFile tfile-path [tfile-path ...]");
+      System.exit(0);
+    }
+    Configuration conf = new Configuration();
+
+    for (String file : args) {
+      System.out.println("===" + file + "===");
+      try {
+        TFileDumper.dumpInfo(file, System.out, conf);
+      } catch (IOException e) {
+        e.printStackTrace(System.err);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
new file mode 100644
index 0000000..25e4f27
--- /dev/null
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A reusable ByteArrayInputStream extends {@link ByteArrayInputStream} to avoid creating stream object on same byte array.
+ * <br><br>Call renew() method to reuse this stream from beginning
+ *
+ * @since 2.0.0
+ */
+public class ReusableByteArrayInputStream extends ByteArrayInputStream
+{
+  
+  private final int initialOffset;
+  
+  private final int initialLength;
+
+  public ReusableByteArrayInputStream(byte[] buf, int offset, int length)
+  {
+    super(buf, offset, length);
+    this.initialLength = Math.min(offset + length, buf.length);
+    this.initialOffset = offset;
+  }
+
+  public ReusableByteArrayInputStream(byte[] buf)
+  {
+    super(buf);
+    this.initialLength = buf.length;
+    this.initialOffset = 0;
+  }
+
+  public void renew()
+  {
+    pos = initialOffset;
+    count = initialLength;
+    mark = 0;
+  }
+  
+  
+  public int getPos(){
+    return pos;
+  }
+  
+  public byte[] getBuf(){
+    return buf;
+  }
+
+}