You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:07 UTC

[8/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib

MLHR-1877 #resolve #comment  moved DTFile implementation to from contrib to lib


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/02f48e1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/02f48e1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/02f48e1b

Branch: refs/heads/devel-3
Commit: 02f48e1b295920cb3c26b84eb802cfe18e3f9ea4
Parents: c1ebde9
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Nov 6 16:40:12 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Nov 6 16:41:11 2015 -0800

----------------------------------------------------------------------
 .../hadoop/io/file/tfile/CacheManager.java      |  183 --
 .../apache/hadoop/io/file/tfile/DTBCFile.java   | 1044 --------
 .../org/apache/hadoop/io/file/tfile/DTFile.java | 2399 ------------------
 .../tfile/ReusableByteArrayInputStream.java     |   66 -
 .../apache/hadoop/io/file/tfile/DTFileTest.java |  217 --
 .../apache/hadoop/io/file/tfile/TestDTFile.java |  432 ----
 .../io/file/tfile/TestDTFileByteArrays.java     |  773 ------
 .../io/file/tfile/TestTFileComparator2.java     |  108 -
 .../io/file/tfile/TestTFileComparators.java     |  123 -
 .../TestTFileJClassComparatorByteArrays.java    |   59 -
 .../tfile/TestTFileLzoCodecsByteArrays.java     |   41 -
 .../file/tfile/TestTFileLzoCodecsStreams.java   |   39 -
 .../tfile/TestTFileNoneCodecsByteArrays.java    |   32 -
 ...ileNoneCodecsJClassComparatorByteArrays.java |   40 -
 .../file/tfile/TestTFileNoneCodecsStreams.java  |   32 -
 .../hadoop/io/file/tfile/TestTFileSeek.java     |  505 ----
 .../file/tfile/TestTFileSeqFileComparison.java  |  802 ------
 .../hadoop/io/file/tfile/TestTFileSplit.java    |  194 --
 .../hadoop/io/file/tfile/TestTFileStreams.java  |  423 ---
 .../file/tfile/TestTFileUnsortedByteArrays.java |  239 --
 .../hadoop/io/file/tfile/CacheManager.java      |  185 ++
 .../apache/hadoop/io/file/tfile/DTBCFile.java   | 1044 ++++++++
 .../org/apache/hadoop/io/file/tfile/DTFile.java | 2399 ++++++++++++++++++
 .../tfile/ReusableByteArrayInputStream.java     |   66 +
 .../apache/hadoop/io/file/tfile/DTFileTest.java |  220 ++
 .../apache/hadoop/io/file/tfile/TestDTFile.java |  432 ++++
 .../io/file/tfile/TestDTFileByteArrays.java     |  773 ++++++
 .../io/file/tfile/TestTFileComparator2.java     |  108 +
 .../io/file/tfile/TestTFileComparators.java     |  123 +
 .../TestTFileJClassComparatorByteArrays.java    |   59 +
 .../tfile/TestTFileLzoCodecsByteArrays.java     |   41 +
 .../file/tfile/TestTFileLzoCodecsStreams.java   |   39 +
 .../tfile/TestTFileNoneCodecsByteArrays.java    |   32 +
 ...ileNoneCodecsJClassComparatorByteArrays.java |   40 +
 .../file/tfile/TestTFileNoneCodecsStreams.java  |   32 +
 .../hadoop/io/file/tfile/TestTFileSeek.java     |  505 ++++
 .../file/tfile/TestTFileSeqFileComparison.java  |  802 ++++++
 .../hadoop/io/file/tfile/TestTFileSplit.java    |  194 ++
 .../hadoop/io/file/tfile/TestTFileStreams.java  |  423 +++
 .../file/tfile/TestTFileUnsortedByteArrays.java |  239 ++
 pom.xml                                         |    1 +
 41 files changed, 7757 insertions(+), 7751 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
deleted file mode 100644
index 2c82d09..0000000
--- a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.lang.management.ManagementFactory;
-import java.util.Collection;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.Weigher;
-
-/**
- * A single global managed cache
- * User can limit the cache size by num of entries, memory size (bytes) or percentage of total heap size
- * <br>
- * <br>
- * Please refer to <a href="https://code.google.com/p/guava-libraries/wiki/CachesExplained">Guava Cache</a> fir details
- * <br>
- * <br>
- * It keeps {@link String} as key and {@link BlockReader} as value
- *
- * @since 2.0.0
- */
-public class CacheManager
-{
-  public static final int STRING_OVERHEAD = 64;
-
-  public static final int BLOCK_READER_OVERHEAD = 368;
-
-  public static final float DEFAULT_HEAP_MEMORY_PERCENTAGE = 0.25f;
-
-  private static Cache<String, BlockReader> singleCache;
-
-  private static boolean enableStats = false;
-
-  public static final Cache<String, BlockReader> buildCache(CacheBuilder builder) {
-    if (singleCache != null) {
-      singleCache.cleanUp();
-    }
-    if (enableStats)
-      builder.recordStats();
-    singleCache = builder.build();
-    return singleCache;
-  }
-
-  /**
-   * (Re)Create the cache by limiting the maximum entries
-   * @param concurrencyLevel
-   * @param initialCapacity
-   * @param maximunSize
-   * @return The cache.
-   */
-  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, int maximunSize){
-    CacheBuilder builder = CacheBuilder.newBuilder().
-        concurrencyLevel(concurrencyLevel).
-        initialCapacity(initialCapacity).
-        maximumSize(maximunSize);
-
-    return buildCache(builder);
-  }
-
-
-  /**
-   * (Re)Create the cache by limiting the memory(in bytes)
-   * @param concurrencyLevel
-   * @param initialCapacity
-   * @param maximumMemory
-   * @return The cache.
-   */
-  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, long maximumMemory){
-
-    CacheBuilder builder = CacheBuilder.newBuilder().
-        concurrencyLevel(concurrencyLevel).
-        initialCapacity(initialCapacity).
-        maximumWeight(maximumMemory).weigher(new KVWeigher());
-
-    return buildCache(builder);
-  }
-
-  /**
-   * (Re)Create the cache by limiting percentage of the total heap memory
-   * @param concurrencyLevel
-   * @param initialCapacity
-   * @param heapMemPercentage
-   * @return The cache.
-   */
-  public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, float heapMemPercentage){
-    CacheBuilder builder = CacheBuilder.newBuilder().
-        concurrencyLevel(concurrencyLevel).
-        initialCapacity(initialCapacity).
-        maximumWeight((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * heapMemPercentage)).weigher(new KVWeigher());
-    return buildCache(builder);
-  }
-
-  public static final void createDefaultCache(){
-
-    long availableMemory = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * DEFAULT_HEAP_MEMORY_PERCENTAGE);
-    CacheBuilder<String, BlockReader> builder = CacheBuilder.newBuilder().maximumWeight(availableMemory).weigher(new KVWeigher());
-
-    singleCache = buildCache(builder);
-  }
-
-  public static final void put(String key, BlockReader blk){
-    if (singleCache == null) {
-      createDefaultCache();
-    }
-    singleCache.put(key, blk);
-  }
-
-  public static final BlockReader get(String key){
-    if (singleCache == null) {
-      return null;
-    }
-    return singleCache.getIfPresent(key);
-  }
-
-  public static final void invalidateKeys(Collection<String> keys)
-  {
-    if (singleCache != null)
-      singleCache.invalidateAll(keys);
-  }
-
-  public static final long getCacheSize() {
-    if (singleCache != null)
-      return singleCache.size();
-    return 0;
-  }
-
-  public static final class KVWeigher implements Weigher<String, BlockReader> {
-
-    @Override
-    public int weigh(String key, BlockReader value)
-    {
-      return (STRING_OVERHEAD + BLOCK_READER_OVERHEAD) +
-          key.getBytes().length +
-          value.getBlockDataInputStream().getBuf().length;
-    }
-
-  }
-
-  @VisibleForTesting
-  protected static Cache<String, BlockReader> getCache() {
-    return singleCache;
-  }
-
-  public static final void setEnableStats(boolean enable) {
-    enableStats = enable;
-  }
-
-  public static void main(String[] args)
-  {
-
-    //code to eitsmate the overhead of the instance of the key value objects
-    // it depends on hbase file
-//    System.out.println(ClassSize.estimateBase(BlockReader.class, true) +
-//        ClassSize.estimateBase(Algorithm.class, true) +
-//        ClassSize.estimateBase(RBlockState.class, true) +
-//        ClassSize.estimateBase(ReusableByteArrayInputStream.class, true) +
-//        ClassSize.estimateBase(BlockRegion.class, true));
-//
-//    System.out.println(
-//        ClassSize.estimateBase(String.class, true));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
deleted file mode 100644
index 779b0f0..0000000
--- a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.file.tfile.CompareUtils.Scalar;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
-import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarComparator;
-import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarLong;
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-
-/**
- * 
- * <ul>
- * <li>The file format of DTFile is same as {@link TFile} with different reader implementation. 
- * It reads data block by block and cache the binary block data into memory to speed up the random read.
- * 
- * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation. 
- * Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method
- *  to expose raw block, key, value data to user to avoid unnecessary internal/external data copy
- *  
- * <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory)
- * </ul>
- * 
- * Block Compressed file, the underlying physical storage layer for TFile.
- * BCFile provides the basic block level compression for the data block and meta
- * blocks. It is separated from TFile as it may be used for other
- * block-compressed file implementation.
- *
- * @since 2.0.0
- */
-final class DTBCFile {
-  // the current version of BCFile impl, increment them (major or minor) made
-  // enough changes
-  static final Version API_VERSION = new Version((short) 1, (short) 0);
-  static final Log LOG = LogFactory.getLog(DTBCFile.class);
-
-  /**
-   * Prevent the instantiation of BCFile objects.
-   */
-  private DTBCFile() {
-    // nothing
-  }
-
-  /**
-   * BCFile writer, the entry point for creating a new BCFile.
-   */
-  static public class Writer implements Closeable {
-    private final FSDataOutputStream out;
-    private final Configuration conf;
-    // the single meta block containing index of compressed data blocks
-    final DataIndex dataIndex;
-    // index for meta blocks
-    final MetaIndex metaIndex;
-    boolean blkInProgress = false;
-    private boolean metaBlkSeen = false;
-    private boolean closed = false;
-    long errorCount = 0;
-    // reusable buffers.
-    private BytesWritable fsOutputBuffer;
-
-    /**
-     * Call-back interface to register a block after a block is closed.
-     */
-    private static interface BlockRegister {
-      /**
-       * Register a block that is fully closed.
-       * 
-       * @param raw
-       *          The size of block in terms of uncompressed bytes.
-       * @param offsetStart
-       *          The start offset of the block.
-       * @param offsetEnd
-       *          One byte after the end of the block. Compressed block size is
-       *          offsetEnd - offsetStart.
-       */
-      public void register(long raw, long offsetStart, long offsetEnd);
-    }
-
-    /**
-     * Intermediate class that maintain the state of a Writable Compression
-     * Block.
-     */
-    private static final class WBlockState {
-      private final Algorithm compressAlgo;
-      private Compressor compressor; // !null only if using native
-      // Hadoop compression
-      private final FSDataOutputStream fsOut;
-      private final long posStart;
-      private final SimpleBufferedOutputStream fsBufferedOutput;
-      private OutputStream out;
-
-      /**
-       * @param compressionAlgo
-       *          The compression algorithm to be used to for compression.
-       * @throws IOException
-       */
-      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut,
-          BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
-        this.compressAlgo = compressionAlgo;
-        this.fsOut = fsOut;
-        this.posStart = fsOut.getPos();
-
-        fsOutputBuffer.setCapacity(DTFile.getFSOutputBufferSize(conf));
-
-        this.fsBufferedOutput =
-            new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
-        this.compressor = compressAlgo.getCompressor();
-
-        try {
-          this.out =
-              compressionAlgo.createCompressionStream(fsBufferedOutput,
-                  compressor, 0);
-        } catch (IOException e) {
-          compressAlgo.returnCompressor(compressor);
-          throw e;
-        }
-      }
-
-      /**
-       * Get the output stream for BlockAppender's consumption.
-       * 
-       * @return the output stream suitable for writing block data.
-       */
-      OutputStream getOutputStream() {
-        return out;
-      }
-
-      /**
-       * Get the current position in file.
-       * 
-       * @return The current byte offset in underlying file.
-       * @throws IOException
-       */
-      long getCurrentPos() throws IOException {
-        return fsOut.getPos() + fsBufferedOutput.size();
-      }
-
-      long getStartPos() {
-        return posStart;
-      }
-
-      /**
-       * Current size of compressed data.
-       * 
-       * @return
-       * @throws IOException
-       */
-      long getCompressedSize() throws IOException {
-        long ret = getCurrentPos() - posStart;
-        return ret;
-      }
-
-      /**
-       * Finishing up the current block.
-       */
-      public void finish() throws IOException {
-        try {
-          if (out != null) {
-            out.flush();
-            out = null;
-          }
-        } finally {
-          compressAlgo.returnCompressor(compressor);
-          compressor = null;
-        }
-      }
-    }
-
-    /**
-     * Access point to stuff data into a block.
-     * 
-     * TODO: Change DataOutputStream to something else that tracks the size as
-     * long instead of int. Currently, we will wrap around if the row block size
-     * is greater than 4GB.
-     */
-    public class BlockAppender extends DataOutputStream {
-      private final BlockRegister blockRegister;
-      private final WBlockState wBlkState;
-      @SuppressWarnings("hiding")
-      private boolean closed = false;
-
-      /**
-       * Constructor
-       * 
-       * @param register
-       *          the block register, which is called when the block is closed.
-       * @param wbs
-       *          The writable compression block state.
-       */
-      BlockAppender(BlockRegister register, WBlockState wbs) {
-        super(wbs.getOutputStream());
-        this.blockRegister = register;
-        this.wBlkState = wbs;
-      }
-
-      /**
-       * Get the raw size of the block.
-       * 
-       * @return the number of uncompressed bytes written through the
-       *         BlockAppender so far.
-       * @throws IOException
-       */
-      public long getRawSize() throws IOException {
-        /**
-         * Expecting the size() of a block not exceeding 4GB. Assuming the
-         * size() will wrap to negative integer if it exceeds 2GB.
-         */
-        return size() & 0x00000000ffffffffL;
-      }
-
-      /**
-       * Get the compressed size of the block in progress.
-       * 
-       * @return the number of compressed bytes written to the underlying FS
-       *         file. The size may be smaller than actual need to compress the
-       *         all data written due to internal buffering inside the
-       *         compressor.
-       * @throws IOException
-       */
-      public long getCompressedSize() throws IOException {
-        return wBlkState.getCompressedSize();
-      }
-
-      @Override
-      public void flush() {
-        // The down stream is a special kind of stream that finishes a
-        // compression block upon flush. So we disable flush() here.
-      }
-
-      /**
-       * Signaling the end of write to the block. The block register will be
-       * called for registering the finished block.
-       */
-      @Override
-      public void close() throws IOException {
-        if (closed == true) {
-          return;
-        }
-        try {
-          ++errorCount;
-          wBlkState.finish();
-          blockRegister.register(getRawSize(), wBlkState.getStartPos(),
-              wBlkState.getCurrentPos());
-          --errorCount;
-        } finally {
-          closed = true;
-          blkInProgress = false;
-        }
-      }
-    }
-
-    /**
-     * Constructor
-     * 
-     * @param fout
-     *          FS output stream.
-     * @param compressionName
-     *          Name of the compression algorithm, which will be used for all
-     *          data blocks.
-     * @throws IOException
-     * @see Compression#getSupportedAlgorithms
-     */
-    public Writer(FSDataOutputStream fout, String compressionName,
-        Configuration conf) throws IOException {
-      if (fout.getPos() != 0) {
-        throw new IOException("Output file not at zero offset.");
-      }
-
-      this.out = fout;
-      this.conf = conf;
-      dataIndex = new DataIndex(compressionName);
-      metaIndex = new MetaIndex();
-      fsOutputBuffer = new BytesWritable();
-      Magic.write(fout);
-    }
-
-    /**
-     * Close the BCFile Writer. Attempting to use the Writer after calling
-     * <code>close</code> is not allowed and may lead to undetermined results.
-     */
-    @Override
-    public void close() throws IOException {
-      if (closed == true) {
-        return;
-      }
-
-      try {
-        if (errorCount == 0) {
-          if (blkInProgress == true) {
-            throw new IllegalStateException(
-                "Close() called with active block appender.");
-          }
-
-          // add metaBCFileIndex to metaIndex as the last meta block
-          BlockAppender appender =
-              prepareMetaBlock(DataIndex.BLOCK_NAME,
-                  getDefaultCompressionAlgorithm());
-          try {
-            dataIndex.write(appender);
-          } finally {
-            appender.close();
-          }
-
-          long offsetIndexMeta = out.getPos();
-          metaIndex.write(out);
-
-          // Meta Index and the trailing section are written out directly.
-          out.writeLong(offsetIndexMeta);
-
-          API_VERSION.write(out);
-          Magic.write(out);
-          out.flush();
-        }
-      } finally {
-        closed = true;
-      }
-    }
-
-    private Algorithm getDefaultCompressionAlgorithm() {
-      return dataIndex.getDefaultCompressionAlgorithm();
-    }
-
-    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
-        throws IOException, MetaBlockAlreadyExists {
-      if (blkInProgress == true) {
-        throw new IllegalStateException(
-            "Cannot create Meta Block until previous block is closed.");
-      }
-
-      if (metaIndex.getMetaByName(name) != null) {
-        throw new MetaBlockAlreadyExists("name=" + name);
-      }
-
-      MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
-      WBlockState wbs =
-          new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
-      BlockAppender ba = new BlockAppender(mbr, wbs);
-      blkInProgress = true;
-      metaBlkSeen = true;
-      return ba;
-    }
-
-    /**
-     * Create a Meta Block and obtain an output stream for adding data into the
-     * block. There can only be one BlockAppender stream active at any time.
-     * Regular Blocks may not be created after the first Meta Blocks. The caller
-     * must call BlockAppender.close() to conclude the block creation.
-     * 
-     * @param name
-     *          The name of the Meta Block. The name must not conflict with
-     *          existing Meta Blocks.
-     * @param compressionName
-     *          The name of the compression algorithm to be used.
-     * @return The BlockAppender stream
-     * @throws IOException
-     * @throws MetaBlockAlreadyExists
-     *           If the meta block with the name already exists.
-     */
-    public BlockAppender prepareMetaBlock(String name, String compressionName)
-        throws IOException, MetaBlockAlreadyExists {
-      return prepareMetaBlock(name, Compression
-          .getCompressionAlgorithmByName(compressionName));
-    }
-
-    /**
-     * Create a Meta Block and obtain an output stream for adding data into the
-     * block. The Meta Block will be compressed with the same compression
-     * algorithm as data blocks. There can only be one BlockAppender stream
-     * active at any time. Regular Blocks may not be created after the first
-     * Meta Blocks. The caller must call BlockAppender.close() to conclude the
-     * block creation.
-     * 
-     * @param name
-     *          The name of the Meta Block. The name must not conflict with
-     *          existing Meta Blocks.
-     * @return The BlockAppender stream
-     * @throws MetaBlockAlreadyExists
-     *           If the meta block with the name already exists.
-     * @throws IOException
-     */
-    public BlockAppender prepareMetaBlock(String name) throws IOException,
-        MetaBlockAlreadyExists {
-      return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
-    }
-
-    /**
-     * Create a Data Block and obtain an output stream for adding data into the
-     * block. There can only be one BlockAppender stream active at any time.
-     * Data Blocks may not be created after the first Meta Blocks. The caller
-     * must call BlockAppender.close() to conclude the block creation.
-     * 
-     * @return The BlockAppender stream
-     * @throws IOException
-     */
-    public BlockAppender prepareDataBlock() throws IOException {
-      if (blkInProgress == true) {
-        throw new IllegalStateException(
-            "Cannot create Data Block until previous block is closed.");
-      }
-
-      if (metaBlkSeen == true) {
-        throw new IllegalStateException(
-            "Cannot create Data Block after Meta Blocks.");
-      }
-
-      DataBlockRegister dbr = new DataBlockRegister();
-
-      WBlockState wbs =
-          new WBlockState(getDefaultCompressionAlgorithm(), out,
-              fsOutputBuffer, conf);
-      BlockAppender ba = new BlockAppender(dbr, wbs);
-      blkInProgress = true;
-      return ba;
-    }
-
-    /**
-     * Callback to make sure a meta block is added to the internal list when its
-     * stream is closed.
-     */
-    private class MetaBlockRegister implements BlockRegister {
-      private final String name;
-      private final Algorithm compressAlgo;
-
-      MetaBlockRegister(String name, Algorithm compressAlgo) {
-        this.name = name;
-        this.compressAlgo = compressAlgo;
-      }
-
-      @Override
-      public void register(long raw, long begin, long end) {
-        metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
-            new BlockRegion(begin, end - begin, raw)));
-      }
-    }
-
-    /**
-     * Callback to make sure a data block is added to the internal list when
-     * it's being closed.
-     * 
-     */
-    private class DataBlockRegister implements BlockRegister {
-      DataBlockRegister() {
-        // do nothing
-      }
-
-      @Override
-      public void register(long raw, long begin, long end) {
-        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
-      }
-    }
-  }
-
-  /**
-   * BCFile Reader, interface to read the file's data and meta blocks.
-   */
-  static public class Reader implements Closeable {
-    private final FSDataInputStream in;
-    private final Configuration conf;
-    final DataIndex dataIndex;
-    // Index for meta blocks
-    final MetaIndex metaIndex;
-    final Version version;
-    //
-    private ByteArrayOutputStream baos;
-    private ArrayList<String> cacheKeys;
-
-    public ArrayList<String> getCacheKeys()
-    {
-      return cacheKeys;
-    }
-
-    /**
-     * Intermediate class that maintain the state of a Readable Compression
-     * Block.
-     */
-    static private final class RBlockState {
-      private final Algorithm compressAlgo;
-      private final ReusableByteArrayInputStream rbain;
-      private final BlockRegion region;
-
-      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, Reader r) throws IOException
-      {
-        this.compressAlgo = compressionAlgo;
-        Decompressor decompressor = compressionAlgo.getDecompressor();
-        this.region = region;
-        try {
-
-          InputStream in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, region.getOffset(), region.getCompressedSize()), decompressor, DTFile.getFSInputBufferSize(conf));
-          int l = 1;
-          r.baos.reset();
-          byte[] buf = new byte[DTFile.getFSInputBufferSize(conf)];
-          while (l >= 0) {
-            l = in.read(buf);
-            if (l > 0) {
-              r.baos.write(buf, 0, l);
-            }
-          }
-          // keep decompressed data into cache
-          byte[] blockData = r.baos.toByteArray();
-          rbain = new ReusableByteArrayInputStream(blockData);
-        } catch (IOException e) {
-          compressAlgo.returnDecompressor(decompressor);
-          throw e;
-        }
-
-      }
-
-      /**
-       * Get the output stream for BlockAppender's consumption.
-       * 
-       * @return the output stream suitable for writing block data.
-       */
-      public ReusableByteArrayInputStream getInputStream() {
-        return rbain;
-      }
-
-      public String getCompressionName() {
-        return compressAlgo.getName();
-      }
-
-      public BlockRegion getBlockRegion() {
-        return region;
-      }
-
-      public void finish() throws IOException {
-        try {
-          rbain.close();
-        } finally {
-        }
-      }
-
-      public void renew()
-      {
-        rbain.renew();
-      }
-    }
-
-    /**
-     * Access point to read a block.
-     */
-    public static class BlockReader extends DataInputStream {
-      private final RBlockState rBlkState;
-      private boolean closed = false;
-      
-      private ReusableByteArrayInputStream wrappedInputStream = null;
-
-      BlockReader(RBlockState rbs) {
-        super(rbs.getInputStream());
-        rBlkState = rbs;
-        wrappedInputStream = rbs.getInputStream();
-      }
-
-      /**
-       * Finishing reading the block. Release all resources.
-       */
-      @Override
-      public void close() throws IOException {
-        if (closed == true) {
-          return;
-        }
-        try {
-          // Do not set rBlkState to null. People may access stats after calling
-          // close().
-          rBlkState.finish();
-        } finally {
-          closed = true;
-        }
-      }
-
-      /**
-       * Get the name of the compression algorithm used to compress the block.
-       * 
-       * @return name of the compression algorithm.
-       */
-      public String getCompressionName() {
-        return rBlkState.getCompressionName();
-      }
-
-      /**
-       * Get the uncompressed size of the block.
-       * 
-       * @return uncompressed size of the block.
-       */
-      public long getRawSize() {
-        return rBlkState.getBlockRegion().getRawSize();
-      }
-
-      /**
-       * Get the compressed size of the block.
-       * 
-       * @return compressed size of the block.
-       */
-      public long getCompressedSize() {
-        return rBlkState.getBlockRegion().getCompressedSize();
-      }
-
-      /**
-       * Get the starting position of the block in the file.
-       * 
-       * @return the starting position of the block in the file.
-       */
-      public long getStartPos() {
-        return rBlkState.getBlockRegion().getOffset();
-      }
-
-      public void renew()
-      {
-        closed = false;
-        rBlkState.renew();
-      }
-      
-      public ReusableByteArrayInputStream getBlockDataInputStream()
-      {
-        return wrappedInputStream;
-      }
-    }
-
-    /**
-     * Constructor
-     * 
-     * @param fin
-     *          FS input stream.
-     * @param fileLength
-     *          Length of the corresponding file
-     * @throws IOException
-     */
-    public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
-        throws IOException {
-      this.in = fin;
-      this.conf = conf;
-      // A reader buffer to read the block
-      baos = new ByteArrayOutputStream(DTFile.getFSInputBufferSize(conf) * 2);
-      this.cacheKeys = new ArrayList<String>();
-      // move the cursor to the beginning of the tail, containing: offset to the
-      // meta block index, version and magic
-      fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
-          / Byte.SIZE);
-      long offsetIndexMeta = fin.readLong();
-      version = new Version(fin);
-      Magic.readAndVerify(fin);
-
-      if (!version.compatibleWith(DTBCFile.API_VERSION)) {
-        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
-      }
-
-      // read meta index
-      fin.seek(offsetIndexMeta);
-      metaIndex = new MetaIndex(fin);
-
-      // read data:BCFile.index, the data block index
-      BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
-      try {
-        dataIndex = new DataIndex(blockR);
-      } finally {
-        blockR.close();
-      }
-    }
-
-    /**
-     * Get the name of the default compression algorithm.
-     * 
-     * @return the name of the default compression algorithm.
-     */
-    public String getDefaultCompressionName() {
-      return dataIndex.getDefaultCompressionAlgorithm().getName();
-    }
-
-    /**
-     * Get version of BCFile file being read.
-     * 
-     * @return version of BCFile file being read.
-     */
-    public Version getBCFileVersion() {
-      return version;
-    }
-
-    /**
-     * Get version of BCFile API.
-     * 
-     * @return version of BCFile API.
-     */
-    public Version getAPIVersion() {
-      return API_VERSION;
-    }
-
-    /**
-     * Finishing reading the BCFile. Release all resources.
-     */
-    @Override
-    public void close() {
-      // Delete buffers in cache for this reader.
-      CacheManager.invalidateKeys(cacheKeys);
-      cacheKeys.clear();
-    }
-
-    /**
-     * Get the number of data blocks.
-     * 
-     * @return the number of data blocks.
-     */
-    public int getBlockCount() {
-      return dataIndex.getBlockRegionList().size();
-    }
-
-    /**
-     * Stream access to a Meta Block.
-     * 
-     * @param name
-     *          meta block name
-     * @return BlockReader input stream for reading the meta block.
-     * @throws IOException
-     * @throws MetaBlockDoesNotExist
-     *           The Meta Block with the given name does not exist.
-     */
-    public BlockReader getMetaBlock(String name) throws IOException,
-        MetaBlockDoesNotExist {
-      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
-      if (imeBCIndex == null) {
-        throw new MetaBlockDoesNotExist("name=" + name);
-      }
-
-      BlockRegion region = imeBCIndex.getRegion();
-      return createReader(imeBCIndex.getCompressionAlgorithm(), region);
-    }
-
-    /**
-     * Stream access to a Data Block.
-     * 
-     * @param blockIndex
-     *          0-based data block index.
-     * @return BlockReader input stream for reading the data block.
-     * @throws IOException
-     */
-    public BlockReader getDataBlock(int blockIndex) throws IOException {
-      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
-        throw new IndexOutOfBoundsException(String.format(
-            "blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
-      }
-
-      BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
-      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
-    }
-
-    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
-        throws IOException {
-        BlockReader br = (BlockReader) CacheManager.get(region.getOffset() + this.toString());
-        if(br==null){
-          RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, this);
-          br = new BlockReader(rbs);
-          String cacheKey = region.getOffset() + this.toString();
-          CacheManager.put(cacheKey, br);
-          cacheKeys.add(cacheKey);
-        } else {
-         br.renew();
-        }
-        return br;
-    }
-
-    /**
-     * Find the smallest Block index whose starting offset is greater than or
-     * equal to the specified offset.
-     * 
-     * @param offset
-     *          User-specific offset.
-     * @return the index to the data Block if such block exists; or -1
-     *         otherwise.
-     */
-    public int getBlockIndexNear(long offset) {
-      ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
-      int idx =
-          Utils
-              .lowerBound(list, new ScalarLong(offset), new ScalarComparator());
-
-      if (idx == list.size()) {
-        return -1;
-      }
-
-      return idx;
-    }
-  }
-
-  /**
-   * Index for all Meta blocks.
-   */
-  static class MetaIndex {
-    // use a tree map, for getting a meta block entry by name
-    final Map<String, MetaIndexEntry> index;
-
-    // for write
-    public MetaIndex() {
-      index = new TreeMap<String, MetaIndexEntry>();
-    }
-
-    // for read, construct the map from the file
-    public MetaIndex(DataInput in) throws IOException {
-      int count = Utils.readVInt(in);
-      index = new TreeMap<String, MetaIndexEntry>();
-
-      for (int nx = 0; nx < count; nx++) {
-        MetaIndexEntry indexEntry = new MetaIndexEntry(in);
-        index.put(indexEntry.getMetaName(), indexEntry);
-      }
-    }
-
-    public void addEntry(MetaIndexEntry indexEntry) {
-      index.put(indexEntry.getMetaName(), indexEntry);
-    }
-
-    public MetaIndexEntry getMetaByName(String name) {
-      return index.get(name);
-    }
-
-    public void write(DataOutput out) throws IOException {
-      Utils.writeVInt(out, index.size());
-
-      for (MetaIndexEntry indexEntry : index.values()) {
-        indexEntry.write(out);
-      }
-    }
-  }
-
-  /**
-   * An entry describes a meta block in the MetaIndex.
-   */
-  static final class MetaIndexEntry {
-    private final String metaName;
-    private final Algorithm compressionAlgorithm;
-    private final static String defaultPrefix = "data:";
-
-    private final BlockRegion region;
-
-    public MetaIndexEntry(DataInput in) throws IOException {
-      String fullMetaName = Utils.readString(in);
-      if (fullMetaName.startsWith(defaultPrefix)) {
-        metaName =
-            fullMetaName.substring(defaultPrefix.length(), fullMetaName
-                .length());
-      } else {
-        throw new IOException("Corrupted Meta region Index");
-      }
-
-      compressionAlgorithm =
-          Compression.getCompressionAlgorithmByName(Utils.readString(in));
-      region = new BlockRegion(in);
-    }
-
-    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm,
-        BlockRegion region) {
-      this.metaName = metaName;
-      this.compressionAlgorithm = compressionAlgorithm;
-      this.region = region;
-    }
-
-    public String getMetaName() {
-      return metaName;
-    }
-
-    public Algorithm getCompressionAlgorithm() {
-      return compressionAlgorithm;
-    }
-
-    public BlockRegion getRegion() {
-      return region;
-    }
-
-    public void write(DataOutput out) throws IOException {
-      Utils.writeString(out, defaultPrefix + metaName);
-      Utils.writeString(out, compressionAlgorithm.getName());
-
-      region.write(out);
-    }
-  }
-
-  /**
-   * Index of all compressed data blocks.
-   */
-  static class DataIndex {
-    final static String BLOCK_NAME = "BCFile.index";
-
-    private final Algorithm defaultCompressionAlgorithm;
-
-    // for data blocks, each entry specifies a block's offset, compressed size
-    // and raw size
-    private final ArrayList<BlockRegion> listRegions;
-
-    // for read, deserialized from a file
-    public DataIndex(DataInput in) throws IOException {
-      defaultCompressionAlgorithm =
-          Compression.getCompressionAlgorithmByName(Utils.readString(in));
-
-      int n = Utils.readVInt(in);
-      listRegions = new ArrayList<BlockRegion>(n);
-
-      for (int i = 0; i < n; i++) {
-        BlockRegion region = new BlockRegion(in);
-        listRegions.add(region);
-      }
-    }
-
-    // for write
-    public DataIndex(String defaultCompressionAlgorithmName) {
-      this.defaultCompressionAlgorithm =
-          Compression
-              .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
-      listRegions = new ArrayList<BlockRegion>();
-    }
-
-    public Algorithm getDefaultCompressionAlgorithm() {
-      return defaultCompressionAlgorithm;
-    }
-
-    public ArrayList<BlockRegion> getBlockRegionList() {
-      return listRegions;
-    }
-
-    public void addBlockRegion(BlockRegion region) {
-      listRegions.add(region);
-    }
-
-    public void write(DataOutput out) throws IOException {
-      Utils.writeString(out, defaultCompressionAlgorithm.getName());
-
-      Utils.writeVInt(out, listRegions.size());
-
-      for (BlockRegion region : listRegions) {
-        region.write(out);
-      }
-    }
-  }
-
-  /**
-   * Magic number uniquely identifying a BCFile in the header/footer.
-   */
-  static final class Magic {
-    private final static byte[] AB_MAGIC_BCFILE =
-        {
-            // ... total of 16 bytes
-            (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91,
-            (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf,
-            (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1,
-            (byte) 0x50 };
-
-    public static void readAndVerify(DataInput in) throws IOException {
-      byte[] abMagic = new byte[size()];
-      in.readFully(abMagic);
-
-      // check against AB_MAGIC_BCFILE, if not matching, throw an
-      // Exception
-      if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
-        throw new IOException("Not a valid BCFile.");
-      }
-    }
-
-    public static void write(DataOutput out) throws IOException {
-      out.write(AB_MAGIC_BCFILE);
-    }
-
-    public static int size() {
-      return AB_MAGIC_BCFILE.length;
-    }
-  }
-
-  /**
-   * Block region.
-   */
-  static final class BlockRegion implements Scalar {
-    private final long offset;
-    private final long compressedSize;
-    private final long rawSize;
-
-    public BlockRegion(DataInput in) throws IOException {
-      offset = Utils.readVLong(in);
-      compressedSize = Utils.readVLong(in);
-      rawSize = Utils.readVLong(in);
-    }
-
-    public BlockRegion(long offset, long compressedSize, long rawSize) {
-      this.offset = offset;
-      this.compressedSize = compressedSize;
-      this.rawSize = rawSize;
-    }
-
-    public void write(DataOutput out) throws IOException {
-      Utils.writeVLong(out, offset);
-      Utils.writeVLong(out, compressedSize);
-      Utils.writeVLong(out, rawSize);
-    }
-
-    public long getOffset() {
-      return offset;
-    }
-
-    public long getCompressedSize() {
-      return compressedSize;
-    }
-
-    public long getRawSize() {
-      return rawSize;
-    }
-
-    @Override
-    public long magnitude() {
-      return offset;
-    }
-  }
-}