You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC

svn commit: r1223020 [4/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Sat Dec 24 21:20:39 2011
@@ -35,8 +35,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +46,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.Compressor;
 
 /**
- * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
  */
 public class HFileWriterV1 extends AbstractHFileWriter {
 
@@ -91,16 +93,17 @@ public class HFileWriterV1 extends Abstr
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        Compression.Algorithm compressAlgo, final KeyComparator comparator)
+        Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
+        KeyComparator comparator)
         throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
-          compressAlgo, comparator);
+          compressAlgo, dataBlockEncoder, comparator);
     }
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        String compressAlgoName,
-        final KeyComparator comparator) throws IOException {
+        String compressAlgoName, KeyComparator comparator)
+        throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
           compressAlgoName, comparator);
     }
@@ -117,7 +120,8 @@ public class HFileWriterV1 extends Abstr
     public Writer createWriter(final FSDataOutputStream ostream,
         final int blockSize, final Compression.Algorithm compress,
         final KeyComparator c) throws IOException {
-      return new HFileWriterV1(cacheConf, ostream, blockSize, compress, c);
+      return new HFileWriterV1(cacheConf, ostream, blockSize, compress,
+          new NoOpDataBlockEncoder(), c);
     }
   }
 
@@ -127,7 +131,7 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
         HFile.DEFAULT_COMPRESSION_ALGORITHM,
-        null);
+        new NoOpDataBlockEncoder(), null);
   }
 
   /**
@@ -138,15 +142,18 @@ public class HFileWriterV1 extends Abstr
       Path path, int blockSize, String compressAlgoName,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), new NoOpDataBlockEncoder(),
+        comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
-  public HFileWriterV1(Configuration conf, CacheConfig cacheConf, FileSystem fs,
-      Path path, int blockSize, Compression.Algorithm compress,
+  public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
+      FileSystem fs, Path path,
+      int blockSize, Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder,
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
-        blockSize, compress, comparator);
+        blockSize, compress, blockEncoder, comparator);
     SchemaMetrics.configureGlobally(conf);
   }
 
@@ -157,15 +164,17 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(cacheConf, outputStream, blockSize,
         Compression.getCompressionAlgorithmByName(compressAlgoName),
-        comparator);
+        new NoOpDataBlockEncoder(), comparator);
   }
 
   /** Constructor that takes a stream. */
   public HFileWriterV1(CacheConfig cacheConf,
       final FSDataOutputStream outputStream, final int blockSize,
-      final Compression.Algorithm compress, final KeyComparator comparator)
+      final Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder, final KeyComparator comparator)
       throws IOException {
-    super(cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(cacheConf, outputStream, null, blockSize, compress,
+        blockEncoder, comparator);
   }
 
   /**
@@ -202,13 +211,17 @@ public class HFileWriterV1 extends Abstr
 
     if (cacheConf.shouldCacheDataOnWrite()) {
       baosDos.flush();
+      // we do not do dataBlockEncoding on disk HFile V2.
       byte[] bytes = baos.toByteArray();
       HFileBlock cBlock = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
-          ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
-      passSchemaMetricsTo(cBlock);
+          ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
+          blockBegin, MemStore.NO_PERSISTENT_TS);
+      HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+          false);
+      passSchemaMetricsTo(codedBlock);
       cacheConf.getBlockCache().cacheBlock(
-          HFile.getBlockCacheKey(name, blockBegin), cBlock);
+          HFile.getBlockCacheKey(name, blockBegin), codedBlock);
       baosDos.close();
     }
     blockNumber++;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Sat Dec 24 21:20:39 2011
@@ -49,9 +49,13 @@ public class HFileWriterV2 extends Abstr
   static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
 
   /** Max memstore (mvcc) timestamp in FileInfo */
-  public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+  public static final byte [] MAX_MEMSTORE_TS_KEY =
+      Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+
   /** KeyValue version in FileInfo */
-  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
+  public static final byte [] KEY_VALUE_VERSION =
+      Bytes.toBytes("KEY_VALUE_VERSION");
+
   /** Version for KeyValue which includes memstore timestamp */
   public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
 
@@ -92,10 +96,10 @@ public class HFileWriterV2 extends Abstr
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        Compression.Algorithm compress,
+        Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
         final KeyComparator comparator) throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
-          compress, comparator);
+          compress, blockEncoder, comparator);
     }
 
     @Override
@@ -128,7 +132,7 @@ public class HFileWriterV2 extends Abstr
       FileSystem fs, Path path)
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
-        HFile.DEFAULT_COMPRESSION_ALGORITHM, null);
+        HFile.DEFAULT_COMPRESSION_ALGORITHM, null, null);
   }
 
   /**
@@ -139,15 +143,16 @@ public class HFileWriterV2 extends Abstr
       Path path, int blockSize, String compressAlgoName,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), null, comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
       Path path, int blockSize, Compression.Algorithm compressAlgo,
+      HFileDataBlockEncoder blockEncoder,
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
-        blockSize, compressAlgo, comparator);
+        blockSize, compressAlgo, blockEncoder, comparator);
     SchemaMetrics.configureGlobally(conf);
     finishInit(conf);
   }
@@ -167,7 +172,8 @@ public class HFileWriterV2 extends Abstr
       final FSDataOutputStream outputStream, final int blockSize,
       final Compression.Algorithm compress, final KeyComparator comparator)
       throws IOException {
-    super(cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(cacheConf, outputStream, null, blockSize, compress, null,
+        comparator);
     finishInit(conf);
   }
 
@@ -177,7 +183,8 @@ public class HFileWriterV2 extends Abstr
       throw new IllegalStateException("finishInit called twice");
 
     // HFile filesystem-level (non-caching) block writer
-    fsBlockWriter = new HFileBlock.Writer(compressAlgo);
+    fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
+        includeMemstoreTS);
 
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@@ -225,8 +232,9 @@ public class HFileWriterV2 extends Abstr
     long startTimeNs = System.nanoTime();
 
     // Update the first data block offset for scanning.
-    if (firstDataBlockOffset == -1)
+    if (firstDataBlockOffset == -1) {
       firstDataBlockOffset = outputStream.getPos();
+    }
 
     // Update the last data block offset
     lastDataBlockOffset = outputStream.getPos();
@@ -242,10 +250,12 @@ public class HFileWriterV2 extends Abstr
     HFile.writeOps.incrementAndGet();
 
     if (cacheConf.shouldCacheDataOnWrite()) {
-      HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching();
-      passSchemaMetricsTo(blockForCaching);
+      HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
+      HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+          includeMemstoreTS);
+      passSchemaMetricsTo(codedBlock);
       cacheConf.getBlockCache().cacheBlock(
-          HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
+          HFile.getBlockCacheKey(name, lastDataBlockOffset), codedBlock);
     }
   }
 
@@ -256,7 +266,7 @@ public class HFileWriterV2 extends Abstr
         long offset = outputStream.getPos();
         boolean cacheThisBlock = ibw.cacheOnWrite();
         ibw.writeInlineBlock(fsBlockWriter.startWriting(
-            ibw.getInlineBlockType(), cacheThisBlock));
+            ibw.getInlineBlockType()));
         fsBlockWriter.writeHeaderAndData(outputStream);
         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
             fsBlockWriter.getUncompressedSizeWithoutHeader());
@@ -265,9 +275,11 @@ public class HFileWriterV2 extends Abstr
         if (cacheThisBlock) {
           // Cache this block on write.
           HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
-          passSchemaMetricsTo(cBlock);
+          HFileBlock codedBlock = blockEncoder.beforeBlockCache(cBlock,
+              includeMemstoreTS);
+          passSchemaMetricsTo(codedBlock);
           cacheConf.getBlockCache().cacheBlock(
-              HFile.getBlockCacheKey(name, offset), cBlock);
+              HFile.getBlockCacheKey(name, offset), codedBlock);
         }
       }
     }
@@ -280,8 +292,7 @@ public class HFileWriterV2 extends Abstr
    */
   private void newBlock() throws IOException {
     // This is where the next block begins.
-    fsBlockWriter.startWriting(BlockType.DATA,
-        cacheConf.shouldCacheDataOnWrite());
+    fsBlockWriter.startWriting(BlockType.DATA);
     firstKeyInBlock = null;
   }
 
@@ -413,8 +424,7 @@ public class HFileWriterV2 extends Abstr
         // store the beginning offset
         long offset = outputStream.getPos();
         // write the metadata content
-        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
-            cacheConf.shouldCacheDataOnWrite());
+        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
         metaData.get(i).write(dos);
 
         fsBlockWriter.writeHeaderAndData(outputStream);
@@ -440,7 +450,7 @@ public class HFileWriterV2 extends Abstr
 
     // Meta block index.
     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
-        BlockType.ROOT_INDEX, false), "meta");
+        BlockType.ROOT_INDEX), "meta");
     fsBlockWriter.writeHeaderAndData(outputStream);
     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 
@@ -450,8 +460,7 @@ public class HFileWriterV2 extends Abstr
     }
 
     // File info
-    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
-        false));
+    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
     fsBlockWriter.writeHeaderAndData(outputStream);
     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Sat Dec 24 21:20:39 2011
@@ -722,4 +722,10 @@ public class LruBlockCache implements Bl
   public void shutdown() {
     this.scheduleThreadPool.shutdown();
   }
+
+  /** Clears the cache. Used in tests. */
+  public void clearCache() {
+    map.clear();
+  }
+
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Does not perform any kind of encoding/decoding.
+ */
+public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
+
+  @Override
+  public HFileBlock afterReadFromDisk(HFileBlock block) {
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      throw new IllegalStateException("Unexpected encoded block");
+    }
+    return block;
+  }
+
+  @Override
+  public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+        boolean isCompaction, boolean includesMemstoreTS) {
+    return block;
+  }
+
+  @Override
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS) {
+    return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+  }
+
+  @Override
+  public HFileBlock beforeBlockCache(HFileBlock block,
+      boolean includesMemstoreTS) {
+    return block;
+  }
+
+  @Override
+  public HFileBlock afterBlockCache(HFileBlock block, boolean isCompaction,
+      boolean includesMemstoreTS) {
+    return block;
+  }
+
+  @Override
+  public boolean useEncodedScanner(boolean isCompaction) {
+    return false;
+  }
+
+  @Override
+  public void saveMetadata(StoreFile.Writer storeFileWriter) {
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Sat Dec 24 21:20:39 2011
@@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.io.Refere
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -528,9 +530,13 @@ public class LoadIncrementalHFiles exten
     CacheConfig cacheConf = new CacheConfig(conf);
     HalfStoreFileReader halfReader = null;
     StoreFile.Writer halfWriter = null;
+    HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+        familyDescriptor.getDataBlockEncodingOnDisk(),
+        familyDescriptor.getDataBlockEncodingInCache(),
+        familyDescriptor.useEncodedDataBlockSeek());
     try {
       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf,
-          reference);
+          reference, dataBlockEncoder);
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -538,7 +544,8 @@ public class LoadIncrementalHFiles exten
       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
 
       halfWriter = new StoreFile.Writer(
-          fs, outFile, blocksize, compression, conf, cacheConf,
+          fs, outFile, blocksize, compression, dataBlockEncoder,
+          conf, cacheConf,
           KeyValue.COMPARATOR, bloomFilterType, 0);
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
@@ -638,7 +645,6 @@ public class LoadIncrementalHFiles exten
       Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
       for (Path hfile : hfiles) {
         if (hfile.getName().startsWith("_")) continue;
-        
         HFile.Reader reader = HFile.createReader(fs, hfile,
             new CacheConfig(getConf()));
         final byte[] first, last;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Dec 24 21:20:39 2011
@@ -870,6 +870,10 @@ public class MemStore implements HeapSiz
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
+  // Constants for whether to serialize memstore timestamp.
+  public static final boolean NO_PERSISTENT_TS = false;
+  public static final boolean PERSISTENT_TS = true;
+
   /*
    * Calculate how the MemStore size has changed.  Includes overhead of the
    * backing Map.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Dec 24 21:20:39 2011
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
@@ -144,6 +146,7 @@ public class Store extends SchemaConfigu
   private final Compression.Algorithm compression;
   /** Compression algorithm for major compaction */
   private final Compression.Algorithm compactionCompression;
+  private HFileDataBlockEncoder dataBlockEncoder;
 
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
@@ -181,6 +184,12 @@ public class Store extends SchemaConfigu
     this.compactionCompression =
       (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
         family.getCompactionCompression() : this.compression;
+
+    this.dataBlockEncoder =
+        new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
+            family.getDataBlockEncodingInCache(),
+            family.useEncodedDataBlockSeek());
+
     this.comparator = info.getComparator();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
@@ -270,6 +279,21 @@ public class Store extends SchemaConfigu
   public Path getHomedir() {
     return homedir;
   }
+  
+  /**
+   * @return the data block encoder
+   */
+  public HFileDataBlockEncoder getDataBlockEncoder() {
+    return dataBlockEncoder;
+  }
+
+  /**
+   * Should be used only in tests.
+   * @param blockEncoder the block delta encoder to use
+   */
+  public void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
+    this.dataBlockEncoder = blockEncoder;
+  }
 
   /*
    * Creates an unsorted list of StoreFile loaded from the given directory.
@@ -292,8 +316,9 @@ public class Store extends SchemaConfigu
         continue;
       }
       StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
       passSchemaMetricsTo(curfile);
+
       curfile.createReader();
       long length = curfile.getReader().length();
       this.storeSize += length;
@@ -447,8 +472,9 @@ public class Store extends SchemaConfigu
     StoreFile.rename(fs, srcPath, dstPath);
 
     StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
     passSchemaMetricsTo(sf);
+
     sf.createReader();
 
     LOG.info("Moved hfile " + srcPath + " into store directory " +
@@ -555,7 +581,6 @@ public class Store extends SchemaConfigu
       MonitoredTask status)
       throws IOException {
     StoreFile.Writer writer;
-    String fileName;
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = region.getSmallestReadPoint();
     long flushed = 0;
@@ -651,8 +676,9 @@ public class Store extends SchemaConfigu
 
     status.setStatus("Flushing " + this + ": reopening flushed file");
     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
     passSchemaMetricsTo(sf);
+
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -690,7 +716,7 @@ public class Store extends SchemaConfigu
     Compression.Algorithm compression)
   throws IOException {
     StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
-        blocksize, compression, comparator, conf, cacheConf,
+        blocksize, compression, dataBlockEncoder, comparator, conf, cacheConf,
         family.getBloomFilterType(), maxKeyCount);
     // The store file writer's path does not include the CF name, so we need
     // to configure the HFile writer directly.
@@ -1416,7 +1442,7 @@ public class Store extends SchemaConfigu
     StoreFile storeFile = null;
     try {
       storeFile = new StoreFile(this.fs, path, this.conf,
-          this.cacheConf, this.family.getBloomFilterType());
+          this.cacheConf, this.family.getBloomFilterType(), null);
       passSchemaMetricsTo(storeFile);
       storeFile.createReader();
     } catch (IOException e) {
@@ -1468,7 +1494,7 @@ public class Store extends SchemaConfigu
             " to " + destPath);
       }
       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
       passSchemaMetricsTo(result);
       result.createReader();
     }
@@ -2056,8 +2082,8 @@ public class Store extends SchemaConfigu
   }
 
   public static final long FIXED_OVERHEAD = 
-      ClassSize.align(new SchemaConfigured().heapSize()
-          + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+      ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+          + (19 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
           + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sat Dec 24 21:20:39 2011
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -130,6 +132,10 @@ public class StoreFile extends SchemaCon
   /** Key for timestamp of earliest-put in metadata*/
   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
 
+  /** Type of encoding used for data blocks in HFile. Stored in file info. */
+  public static final byte[] DATA_BLOCK_ENCODING =
+      Bytes.toBytes("DATA_BLOCK_ENCODING");
+
   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -148,7 +154,10 @@ public class StoreFile extends SchemaCon
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
-  // HDFS blocks distribuion information
+  // What kind of data block encoding will be used
+  private HFileDataBlockEncoder dataBlockEncoder;
+
+  // HDFS blocks distribution information
   private HDFSBlocksDistribution hdfsBlocksDistribution;
 
   // Keys for metadata stored in backing HFile.
@@ -207,6 +216,23 @@ public class StoreFile extends SchemaCon
   private long modificationTimeStamp = 0L;
 
   /**
+   * Ignore bloom filters, don't use option inMemory
+   * and dataBlockEncoding in memory.
+   * @param fs The current file system to use
+   * @param p The path of the file.
+   * @param conf The current configuration.
+   * @throws IOException When opening the reader fails.
+   */
+  StoreFile(final FileSystem fs,
+            final Path p,
+            final Configuration conf,
+            final CacheConfig cacheConf)
+      throws IOException {
+    this(fs, p, conf, cacheConf, BloomType.NONE,
+        new NoOpDataBlockEncoder());
+  }
+
+  /**
    * Constructor, loads a reader and it's indices, etc. May allocate a
    * substantial amount of ram depending on the underlying files (10-20MB?).
    *
@@ -220,17 +246,20 @@ public class StoreFile extends SchemaCon
    *          as the Bloom filter type actually present in the HFile, because
    *          column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param dataBlockEncoder data block encoding algorithm.
    * @throws IOException When opening the reader fails.
    */
   StoreFile(final FileSystem fs,
             final Path p,
             final Configuration conf,
             final CacheConfig cacheConf,
-            final BloomType cfBloomType)
+            final BloomType cfBloomType,
+            final HFileDataBlockEncoder dataBlockEncoder)
       throws IOException {
     this.fs = fs;
     this.path = p;
     this.cacheConf = cacheConf;
+    this.dataBlockEncoder = dataBlockEncoder;
     if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       this.referencePath = getReferredToFile(this.path);
@@ -493,9 +522,10 @@ public class StoreFile extends SchemaCon
     }
     if (isReference()) {
       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
-          this.cacheConf, this.reference);
+          this.cacheConf, this.reference, this.dataBlockEncoder);
     } else {
-      this.reader = new Reader(this.fs, this.path, this.cacheConf);
+      this.reader = new Reader(this.fs, this.path, this.cacheConf,
+          this.dataBlockEncoder);
     }
 
     if (isSchemaConfigured()) {
@@ -677,8 +707,8 @@ public class StoreFile extends SchemaCon
   public static Writer createWriter(final FileSystem fs, final Path dir,
       final int blocksize, Configuration conf, CacheConfig cacheConf)
   throws IOException {
-    return createWriter(fs, dir, blocksize, null, null, conf, cacheConf,
-        BloomType.NONE, 0);
+    return createWriter(fs, dir, blocksize, null, new NoOpDataBlockEncoder(),
+        null, conf, cacheConf, BloomType.NONE, 0);
   }
 
   /**
@@ -689,6 +719,7 @@ public class StoreFile extends SchemaCon
    * Creates a file with a unique name in this directory.
    * @param blocksize
    * @param algorithm Pass null to get default.
+   * @param dataBlockEncoder Pass null to disable data block encoding.
    * @param c Pass null to get default.
    * @param conf HBase system configuration. used with bloom filters
    * @param cacheConf Cache configuration and reference.
@@ -701,6 +732,7 @@ public class StoreFile extends SchemaCon
                                               final Path dir,
                                               final int blocksize,
                                               final Compression.Algorithm algorithm,
+                                              final HFileDataBlockEncoder dataBlockEncoder,
                                               final KeyValue.KVComparator c,
                                               final Configuration conf,
                                               final CacheConfig cacheConf,
@@ -718,7 +750,7 @@ public class StoreFile extends SchemaCon
 
     return new Writer(fs, path, blocksize,
         algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
-        conf, cacheConf, c == null ? KeyValue.COMPARATOR: c, bloomType,
+        dataBlockEncoder, conf, cacheConf, c == null ? KeyValue.COMPARATOR : c, bloomType,
         maxKeyCount);
   }
 
@@ -814,6 +846,8 @@ public class StoreFile extends SchemaCon
     private KeyValue lastDeleteFamilyKV = null;
     private long deleteFamilyCnt = 0;
 
+    protected HFileDataBlockEncoder dataBlockEncoder;
+
     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
      * When flushing a memstore, we set TimeRange and use this variable to
@@ -838,13 +872,16 @@ public class StoreFile extends SchemaCon
      * @throws IOException problem writing to FS
      */
     public Writer(FileSystem fs, Path path, int blocksize,
-        Compression.Algorithm compress, final Configuration conf,
+        Compression.Algorithm compress,
+        HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
         CacheConfig cacheConf,
         final KVComparator comparator, BloomType bloomType, long maxKeys)
         throws IOException {
+      this.dataBlockEncoder = dataBlockEncoder != null ?
+          dataBlockEncoder : new NoOpDataBlockEncoder();
       writer = HFile.getWriterFactory(conf, cacheConf).createWriter(
           fs, path, blocksize,
-          compress, comparator.getRawComparator());
+          compress, this.dataBlockEncoder, comparator.getRawComparator());
 
       this.kvComparator = comparator;
 
@@ -1081,6 +1118,10 @@ public class StoreFile extends SchemaCon
     }
 
     public void close() throws IOException {
+      // (optional) Add data block encoding used to save this file
+      // It is mostly for statistics and debugging purpose.
+      dataBlockEncoder.saveMetadata(this);
+
       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
 
@@ -1119,10 +1160,12 @@ public class StoreFile extends SchemaCon
     private byte[] lastBloomKey;
     private long deleteFamilyCnt = -1;
 
-    public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
+    public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
+        HFileDataBlockEncoder dataBlockEncoder)
         throws IOException {
       super(path);
-      reader = HFile.createReader(fs, path, cacheConf);
+      reader = HFile.createReader(fs, path, cacheConf,
+          dataBlockEncoder);
       bloomFilterType = BloomType.NONE;
     }
 
@@ -1262,7 +1305,7 @@ public class StoreFile extends SchemaCon
 
         default:
           return true;
-      }      
+      }
     }
 
     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
@@ -1312,7 +1355,7 @@ public class StoreFile extends SchemaCon
         return true;
 
       byte[] key;
-      switch (bloomFilterType) { 
+      switch (bloomFilterType) {
         case ROW:
           if (col != null) {
             throw new RuntimeException("Row-only Bloom filter called with " +

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sat Dec 24 21:20:39 2011
@@ -177,7 +177,7 @@ class StoreFileScanner implements KeyVal
         realSeekDone = true;
       }
     } catch (IOException ioe) {
-      throw new IOException("Could not seek " + this, ioe);
+      throw new IOException("Could not seek " + this + " " + key, ioe);
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Sat Dec 24 21:20:39 2011
@@ -83,6 +83,15 @@ public class SchemaConfigured implements
   }
 
   /**
+   * Creates an instance corresponding to an unknown table and column family.
+   * Used in unit tests. 
+   */
+  public static SchemaConfigured createUnknown() {
+    return new SchemaConfigured(null, SchemaMetrics.UNKNOWN,
+        SchemaMetrics.UNKNOWN);
+  }
+
+  /**
    * Default constructor. Only use when column/family name are not known at
    * construction (i.e. for HFile blocks).
    */

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.
+    EncoderBufferTooSmallException;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Utility functions for working with byte buffers, such as reading/writing
+ * variable-length long numbers.
+ */
+public final class ByteBufferUtils {
+
+  // "Compressed integer" serialization helper constants.
+  private final static int VALUE_MASK = 0x7f;
+  private final static int NEXT_BIT_SHIFT = 7;
+  private final static int NEXT_BIT_MASK = 1 << 7;
+
+  private ByteBufferUtils() {
+  }
+
+  /**
+   * Similar to {@link WritableUtils#writeVLong(java.io.DataOutput, long)},
+   * but writes to a {@link ByteBuffer}.
+   */
+  public static void writeVLong(ByteBuffer out, long i) {
+    if (i >= -112 && i <= 127) {
+      out.put((byte) i);
+      return;
+    }
+
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    out.put((byte) len);
+
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      out.put((byte) ((i & mask) >> shiftbits));
+    }
+  }
+
+  /**
+   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
+   * {@link ByteBuffer}.
+   */
+  public static long readVLong(ByteBuffer in) {
+    byte firstByte = in.get();
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = in.get();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+
+  /**
+   * Put in buffer integer using 7 bit encoding. For each written byte:
+   * 7 bits are used to store value
+   * 1 bit is used to indicate whether there is next bit.
+   * @param value Int to be compressed.
+   * @param out Where to put compressed data
+   * @return Number of bytes written.
+   * @throws IOException on stream error
+   */
+   public static int putCompressedInt(OutputStream out, final int value)
+      throws IOException {
+    int i = 0;
+    int tmpvalue = value;
+    do {
+      byte b = (byte) (tmpvalue & VALUE_MASK);
+      tmpvalue >>>= NEXT_BIT_SHIFT;
+      if (tmpvalue != 0) {
+        b |= (byte) NEXT_BIT_MASK;
+      }
+      out.write(b);
+      i++;
+    } while (tmpvalue != 0);
+    return i;
+  }
+
+   /**
+    * Put in output stream 32 bit integer (Big Endian byte order).
+    * @param out Where to put integer.
+    * @param value Value of integer.
+    * @throws IOException On stream error.
+    */
+   public static void putInt(OutputStream out, final int value)
+       throws IOException {
+     for (int i = Bytes.SIZEOF_INT - 1 ; i >= 0 ; --i) {
+       out.write((byte) (value >>> (i * 8)));
+     }
+   }
+
+  /**
+   * Copy byte to the output stream.
+   * @param b byte to be copied
+   * @param out Where to put compressed data
+   * @return Number of written bytes.
+   * @throws IOException on stream error
+   */
+  public static int copyToStream(OutputStream out, byte b)
+      throws IOException {
+    out.write(b);
+    return Bytes.SIZEOF_BYTE;
+  }
+
+  /**
+   * Copy the data to the output stream and update position in buffer.
+   * @param out Write bytes here.
+   * @param buffer Source buffer in certain position.
+   * @param length Length of copy.
+   * @return Number of written bytes.
+   * @throws IOException on stream error
+   */
+  public static int copyToStream(OutputStream out, ByteBuffer buffer,
+      int length) throws IOException {
+    if (buffer.hasArray()) {
+      out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+          length);
+      skip(buffer, length);
+    } else {
+      for (int i = 0 ; i < length ; ++i) {
+        out.write(buffer.get());
+      }
+    }
+    return length;
+  }
+
+  /**
+   * Copy the data to the output stream
+   * @param out Write bytes here.
+   * @param in Source buffer in certain position.
+   * @param offset In the buffer from position 0.
+   * @param length Length of copy.
+   * @return Number of written bytes.
+   * @throws IOException on stream error
+   */
+  public static int copyToStream(OutputStream out, ByteBuffer in,
+      int offset, int length) throws IOException {
+    if (in.hasArray()) {
+      out.write(in.array(), in.arrayOffset() + offset,
+          length);
+    } else {
+      for (int i = 0 ; i < length ; ++i) {
+        out.write(in.get(offset + i));
+      }
+    }
+    return length;
+  }
+
+  public static int putLong(OutputStream out, final long length,
+      final int fitInBytes) throws IOException {
+    long tmpLength = length;
+    for (int i = 0 ; i < fitInBytes ; ++i) {
+      out.write((byte) (tmpLength & 0xff));
+      tmpLength >>>= 8;
+    }
+    return fitInBytes;
+  }
+
+  /**
+   * Check how many bytes are required to store value.
+   * @param value Value which size will be tested.
+   * @return How many bytes are required to store value.
+   */
+  public static int longFitsIn(final long value) {
+    if (value < 0) {
+      return 8;
+    }
+
+    if (value < (1l << 4 * 8)) {
+      // no more than 4 bytes
+      if (value < (1l << 2 * 8)) {
+        if (value < (1l << 1 * 8)) {
+          return 1;
+        }
+        return 2;
+      }
+      if (value < (1l << 3 * 8)) {
+        return 3;
+      }
+      return 4;
+    }
+    // more than 4 bytes
+    if (value < (1l << 6 * 8)) {
+      if (value < (1l << 5 * 8)) {
+        return 5;
+      }
+      return 6;
+    }
+    if (value < (1l << 7 * 8)) {
+      return 7;
+    }
+    return 8;
+  }
+
+  /**
+   * Check how many bytes is required to store value.
+   * @param value Value which size will be tested.
+   * @return How many bytes are required to store value.
+   */
+  public static int intFitsIn(final int value) {
+    if (value < 0) {
+      return 4;
+    }
+
+    if (value < (1 << 2 * 8)) {
+      if (value < (1 << 1 * 8)) {
+        return 1;
+      }
+      return 2;
+    }
+    if (value <= (1 << 3 * 8)) {
+      return 3;
+    }
+    return 4;
+  }
+
+  /**
+   * Read integer from stream coded in 7 bits and increment position.
+   * @return Read integer.
+   * @throws IOException
+   */
+  public static int readCompressedInt(InputStream input)
+      throws IOException {
+    int result = 0;
+    int i = 0;
+    byte b;
+    do {
+      b = (byte) input.read();
+      result += (b & VALUE_MASK) << (NEXT_BIT_SHIFT * i);
+      i++;
+      if (i > Bytes.SIZEOF_INT + 1) {
+        throw new IllegalStateException(
+            "Corrupted compressed int (too long: " + (i + 1) + " bytes)");
+      }
+    } while (0 != (b & NEXT_BIT_MASK));
+    return result;
+  }
+
+  /**
+   * Read integer from buffer coded in 7 bits and increment position.
+   * @return Read integer.
+   */
+  public static int readCompressedInt(ByteBuffer buffer) {
+    byte b = buffer.get();
+    if ((b & NEXT_BIT_MASK) != 0) {
+      return (b & VALUE_MASK) + (readCompressedInt(buffer) << NEXT_BIT_SHIFT);
+    }
+    return b & VALUE_MASK;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   * @throws IOException
+   */
+  public static long readLong(InputStream input, final int fitInBytes)
+      throws IOException {
+    long tmpLong = 0;
+    for (int i = 0 ; i < fitInBytes ; ++i) {
+      tmpLong |= (input.read() & 0xffl) << (8 * i);
+    }
+    return tmpLong;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   */
+  public static long readLong(ByteBuffer buffer, final int fitInBytes) {
+    long tmpLength = 0;
+    for (int i = 0 ; i < fitInBytes ; ++i) {
+      tmpLength |= (buffer.get() & 0xffl) << (8l * i);
+    }
+    return tmpLength;
+  }
+
+  /**
+   * Asserts that we have 'length' bytes remaining in 'buffer'.
+   * @param buffer Where are we looking for remaining bytes.
+   * @param length How many bytes do we need.
+   * @throws EncoderBufferTooSmallException If there are no enough bytes.
+   */
+  public static void ensureSpace(ByteBuffer buffer, int length)
+      throws EncoderBufferTooSmallException {
+    if (buffer.position() + length > buffer.limit()) {
+      throw new EncoderBufferTooSmallException(
+          "Buffer position=" + buffer.position() +
+          ", buffer limit=" + buffer.limit() +
+          ", length to be written=" + length);
+    }
+  }
+
+  /**
+   * Copy 'length' bytes from 'source' and put it at the current position of
+   * 'buffer'. Update position in 'buffer' afterwards.
+   * @param source From where data should be read.
+   * @param buffer Write data here.
+   * @param length Read that many bytes.
+   * @throws IOException If there is problem in source.
+   */
+  public static void copyFromStream(DataInputStream source,
+      ByteBuffer buffer, int length) throws IOException {
+    if (buffer.hasArray()) {
+      source.readFully(buffer.array(), buffer.position() + buffer.arrayOffset(),
+          length);
+      skip(buffer, length);
+    } else {
+      for (int i = 0 ; i < length ; ++i) {
+        buffer.put(source.readByte());
+      }
+    }
+  }
+
+  /**
+   * Copy from one buffer to another from given offset
+   * @param source From where copy.
+   * @param destination Where to copy.
+   * @param sourceOffset Offset in the source buffer
+   * @param length How many bytes will be copied.
+   * @throws IOException
+   */
+  public static void copyFromBuffer(ByteBuffer source,
+      ByteBuffer destination, int sourceOffset, int length) {
+    if (source.hasArray() && destination.hasArray()) {
+      System.arraycopy(source.array(), sourceOffset + source.arrayOffset(),
+          destination.array(), destination.position() +
+          destination.arrayOffset(), length);
+      skip(destination, length);
+    } else {
+      for (int i = 0 ; i < length ; ++i) {
+        destination.put(source.get(sourceOffset + i));
+      }
+    }
+  }
+
+  /**
+   * Find length of common prefix of two parts in the buffer
+   * @param buffer Where parts are located.
+   * @param offsetLeft Offset of the first part.
+   * @param offsetRight Offset of the second part.
+   * @param limit Maximal length of common prefix.
+   * @return Length of prefix.
+   */
+  public static int findCommonPrefix(ByteBuffer buffer, int offsetLeft,
+      int offsetRight, int limit) {
+    int prefix = 0;
+
+    for (; prefix < limit ; ++prefix) {
+      if (buffer.get(offsetLeft + prefix) != buffer.get(offsetRight + prefix)) {
+        break;
+      }
+    }
+
+    return prefix;
+  }
+
+  /**
+   * Find length of common prefix in two arrays.
+   * @param left Array to be compared.
+   * @param leftOffset Offset in left array.
+   * @param leftLength Length of left array.
+   * @param right Array to be compared.
+   * @param rightArray Offset in right array.
+   * @param rightLength Length of right array.
+   */
+  public static int findCommonPrefix(
+      byte[] left, int leftOffset, int leftLength,
+      byte[] right, int rightOffset, int rightLength) {
+    int length = Math.min(leftLength, rightLength);
+    int result = 0;
+
+    while (result < length &&
+        left[leftOffset + result] == right[rightOffset + result]) {
+      result++;
+    }
+
+    return result;
+  }
+
+  /**
+   * Check whether two parts in the same buffer are equal.
+   * @param buffer In which buffer there are parts
+   * @param offsetLeft Beginning of first part.
+   * @param lengthLeft Length of the first part.
+   * @param offsetRight Beginning of the second part.
+   * @param lengthRight Length of the second part.
+   * @return
+   */
+  public static boolean arePartsEqual(ByteBuffer buffer,
+      int offsetLeft, int lengthLeft,
+      int offsetRight, int lengthRight) {
+    if (lengthLeft != lengthRight) {
+      return false;
+    }
+
+    if (buffer.hasArray()) {
+      return 0 == Bytes.compareTo(
+          buffer.array(), buffer.arrayOffset() + offsetLeft, lengthLeft,
+          buffer.array(), buffer.arrayOffset() + offsetRight, lengthRight);
+    }
+
+    for (int i = 0 ; i < lengthRight ; ++i) {
+      if (buffer.get(offsetLeft + i) != buffer.get(offsetRight + i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Increment position in buffer.
+   * @param buffer In this buffer.
+   * @param length By that many bytes.
+   */
+  public static void skip(ByteBuffer buffer, int length) {
+    buffer.position(buffer.position() + length);
+  }
+
+  /**
+   * Read int, assuming it is stored in N bytes with no special encoding.
+   * @param source From where read bytes.
+   * @param intLength How long is the integer
+   * @return The value of the integer.
+   * @throws IOException On IO error.
+   */
+  public static int readCompressedInt(InputStream source, int intLength)
+      throws IOException {
+    int result = 0;
+    for (int i = 0 ; i < intLength ; ++i) {
+      result = (result << 8) + (source.read() & 0xff);
+    }
+    return result;
+  }
+
+  /**
+   * Read int, assuming it is stored in N bytes with no special encoding.
+   * @param buffer Read bytes from this buffer.
+   * @param intLength The lenght of the integer in bytes.
+   * @return The value of the integer.
+   */
+  public static int readCompressedInt(ByteBuffer buffer, int intLength) {
+    int result = 0;
+    for (int i = 0 ; i < intLength ; ++i) {
+      result = (result << 8) + (buffer.get() & 0xff);
+    }
+    return result;
+  }
+
+}

Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Sat Dec 24 21:20:39 2011
@@ -532,6 +532,9 @@ module Hbase
       family.setInMemory(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
       family.setTimeToLive(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::TTL])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
       family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
+      family.setDataBlockEncodingOnDisk(org.apache.hadoop.hbase.io.encoding.DataBlockEncodingAlgorithms::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_ON_DISK])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_ON_DISK)
+      family.setDataBlockEncodingInCache(org.apache.hadoop.hbase.io.encoding.DataBlockEncodingAlgorithms::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_IN_CACHE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING_IN_CACHE)
+      family.setEncodedDataBlockSeek(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::ENCODED_DATA_BLOCK_SEEK])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCODED_DATA_BLOCK_SEEK)
       family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
       family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
       family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Sat Dec 24 21:20:39 2011
@@ -221,18 +221,33 @@ public abstract class HBaseTestCase exte
       final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
     HTableDescriptor htd = new HTableDescriptor(name);
     htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
-        HColumnDescriptor.DEFAULT_BLOOMFILTER,
-        HConstants.REPLICATION_SCOPE_LOCAL));
+      keepDeleted,
+      HColumnDescriptor.DEFAULT_COMPRESSION, 
+      HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+      HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+      HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+      false, false,
+      HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+      HColumnDescriptor.DEFAULT_BLOOMFILTER,
+      HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+        keepDeleted,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+        HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+        false, false,
         HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+        keepDeleted,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+        HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+        false, false,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE,  ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     return htd;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java Sat Dec 24 21:20:39 2011
@@ -191,7 +191,8 @@ public class HFilePerformanceEvaluation 
     void setUp() throws Exception {
       writer =
         HFile.getWriterFactoryNoCache(conf).createWriter(this.fs,
-            this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null);
+            this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null,
+            null);
     }
 
     @Override
@@ -365,4 +366,4 @@ public class HFilePerformanceEvaluation 
   public static void main(String[] args) throws Exception {
     new HFilePerformanceEvaluation().runBenchmarks();
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Sat Dec 24 21:20:39 2011
@@ -143,6 +143,9 @@ public class TestFromClientSide {
          HColumnDescriptor.DEFAULT_VERSIONS,
          true,
          HColumnDescriptor.DEFAULT_COMPRESSION,
+         HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+         HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE,
+         HColumnDescriptor.DEFAULT_ENCODED_DATA_BLOCK_SEEK,
          HColumnDescriptor.DEFAULT_IN_MEMORY,
          HColumnDescriptor.DEFAULT_BLOCKCACHE,
          HColumnDescriptor.DEFAULT_BLOCKSIZE,

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java Sat Dec 24 21:20:39 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -98,7 +99,7 @@ public class TestHalfStoreFileReader {
       CacheConfig cacheConf)
       throws IOException {
     final HalfStoreFileReader halfreader =
-        new HalfStoreFileReader(fs, p, cacheConf, bottom);
+        new HalfStoreFileReader(fs, p, cacheConf, bottom, null);
     halfreader.loadFileInfo();
     final HFileScanner scanner = halfreader.getScanner(false, false);
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Sat Dec 24 21:20:39 2011
@@ -295,7 +295,14 @@ public class TestHeapSize extends TestCa
       assertEquals(expected, actual);
     }
 
+    // SchemaConfigured
+    LOG.debug("Heap size for: " + SchemaConfigured.class.getName());
+    SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
+    assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
+        sc.heapSize());
+
     // Store Overhead
+    LOG.debug("Heap size for: " + Store.class.getName());
     cl = Store.class;
     actual = Store.FIXED_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
@@ -319,10 +326,6 @@ public class TestHeapSize extends TestCa
     // accounted for.  But we have satisfied our two core requirements.
     // Sizing is quite accurate now, and our tests will throw errors if
     // any of these classes are modified without updating overhead sizes.
-
-    SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
-    assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
-        sc.heapSize());
   }
 
   @org.junit.Rule

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Sat Dec 24 21:20:39 2011
@@ -41,9 +41,15 @@ import org.apache.hadoop.hbase.regionser
 
 public class CacheTestUtils {
 
-  /*Just checks if heapsize grows when something is cached, and gets smaller when the same object is evicted*/
+  private static final boolean includesMemstoreTS = true;
 
-  public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize){
+  /**
+   * Just checks if heapsize grows when something is cached, and gets smaller
+   * when the same object is evicted
+   */
+
+  public static void testHeapSizeChanges(final BlockCache toBeTested,
+      final int blockSize) {
     HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
     long heapSize = ((HeapSize) toBeTested).heapSize();
     toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
@@ -316,7 +322,8 @@ public class CacheTestUtils {
 
       HFileBlock generated = new HFileBlock(BlockType.DATA,
           onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
-          prevBlockOffset, cachedBuffer, false, blockSize);
+          prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
+          blockSize, includesMemstoreTS);
 
       String strKey;
       /* No conflicting keys */

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Sat Dec 24 21:20:39 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.junit.After;
@@ -42,6 +43,7 @@ import org.junit.experimental.categories
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+
 import static org.junit.Assert.*;
 
 /**
@@ -61,10 +63,12 @@ public class TestCacheOnWrite {
   private FileSystem fs;
   private Random rand = new Random(12983177L);
   private Path storeFilePath;
-  private Compression.Algorithm compress;
-  private CacheOnWriteType cowType;
   private BlockCache blockCache;
-  private String testName;
+  private String testDescription;
+
+  private final CacheOnWriteType cowType;
+  private final Compression.Algorithm compress;
+  private final BlockEncoderTestType encoderType;
 
   private static final int DATA_BLOCK_SIZE = 2048;
   private static final int NUM_KV = 25000;
@@ -76,49 +80,90 @@ public class TestCacheOnWrite {
       KeyValue.Type.values().length - 2;
 
   private static enum CacheOnWriteType {
-    DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY),
-    BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
-        CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY),
-    INDEX_BLOCKS(BlockType.LEAF_INDEX,
-        CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
+    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
+        BlockType.DATA, BlockType.ENCODED_DATA),
+    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+        BlockType.BLOOM_CHUNK),
+    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+        BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
 
     private final String confKey;
-    private final BlockType inlineBlockType;
+    private final BlockType blockType1;
+    private final BlockType blockType2;
+
+    private CacheOnWriteType(String confKey, BlockType blockType) {
+      this(confKey, blockType, blockType);
+    }
 
-    private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
-      this.inlineBlockType = inlineBlockType;
+    private CacheOnWriteType(String confKey, BlockType blockType1,
+        BlockType blockType2) {
+      this.blockType1 = blockType1;
+      this.blockType2 = blockType2;
       this.confKey = confKey;
     }
 
     public boolean shouldBeCached(BlockType blockType) {
-      return blockType == inlineBlockType
-          || blockType == BlockType.INTERMEDIATE_INDEX
-          && inlineBlockType == BlockType.LEAF_INDEX;
+      return blockType == blockType1 || blockType == blockType2;
     }
 
     public void modifyConf(Configuration conf) {
-      for (CacheOnWriteType cowType : CacheOnWriteType.values())
+      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
         conf.setBoolean(cowType.confKey, cowType == this);
+      }
     }
 
   }
 
+  private static final DataBlockEncodings.Algorithm ENCODING_ALGO =
+      DataBlockEncodings.Algorithm.PREFIX;
+
+  /** Provides fancy names for four combinations of two booleans */
+  private static enum BlockEncoderTestType {
+    NO_BLOCK_ENCODING(false, false),
+    BLOCK_ENCODING_IN_CACHE_ONLY(false, true),
+    BLOCK_ENCODING_ON_DISK_ONLY(true, false),
+    BLOCK_ENCODING_EVERYWHERE(true, true);
+
+    private final boolean encodeOnDisk;
+    private final boolean encodeInCache;
+
+    BlockEncoderTestType(boolean encodeOnDisk, boolean encodeInCache) {
+      this.encodeOnDisk = encodeOnDisk;
+      this.encodeInCache = encodeInCache;
+    }
+
+    public HFileDataBlockEncoder getEncoder() {
+      // We always use an encoded seeker. It should not have effect if there
+      // is no encoding in cache.
+      return new HFileDataBlockEncoderImpl(
+          encodeOnDisk ? ENCODING_ALGO : DataBlockEncodings.Algorithm.NONE,
+          encodeInCache ? ENCODING_ALGO : DataBlockEncodings.Algorithm.NONE,
+          true);
+    }
+  }
+
   public TestCacheOnWrite(CacheOnWriteType cowType,
-      Compression.Algorithm compress) {
+      Compression.Algorithm compress, BlockEncoderTestType encoderType) {
     this.cowType = cowType;
     this.compress = compress;
-    testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
-    System.out.println(testName);
+    this.encoderType = encoderType;
+    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 
+        ", encoderType=" + encoderType + "]";
+    System.out.println(testDescription);
   }
 
   @Parameters
   public static Collection<Object[]> getParameters() {
     List<Object[]> cowTypes = new ArrayList<Object[]>();
-    for (CacheOnWriteType cowType : CacheOnWriteType.values())
+    for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
       for (Compression.Algorithm compress :
            HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
-        cowTypes.add(new Object[] { cowType, compress });
+        for (BlockEncoderTestType encoderType :
+             BlockEncoderTestType.values()) {
+          cowTypes.add(new Object[] { cowType, compress, encoderType });
+        }
       }
+    }
     return cowTypes;
   }
 
@@ -156,10 +201,10 @@ public class TestCacheOnWrite {
 
   private void readStoreFile() throws IOException {
     HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
-        storeFilePath, cacheConf);
+        storeFilePath, cacheConf, encoderType.getEncoder());
     LOG.info("HFile information: " + reader);
     HFileScanner scanner = reader.getScanner(false, false);
-    assertTrue(testName, scanner.seekTo());
+    assertTrue(testDescription, scanner.seekTo());
 
     long offset = 0;
     HFileBlock prevBlock = null;
@@ -174,10 +219,11 @@ public class TestCacheOnWrite {
       // Flags: don't cache the block, use pread, this is not a compaction.
       HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
           false);
-      BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
+      BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(),
+          offset);
       boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
       boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
-      assertEquals(testName + " " + block, shouldBeCached, isCached);
+      assertEquals(testDescription + " " + block, shouldBeCached, isCached);
       prevBlock = block;
       offset += block.getOnDiskSizeWithHeader();
       BlockType bt = block.getBlockType();
@@ -187,8 +233,10 @@ public class TestCacheOnWrite {
 
     LOG.info("Block count by type: " + blockCountByType);
     String countByType = blockCountByType.toString();
-    assertEquals(
-        "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+    BlockType cachedDataBlockType =
+        encoderType.encodeInCache ? BlockType.ENCODED_DATA : BlockType.DATA;
+    assertEquals("{" + cachedDataBlockType
+        + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
         countByType);
 
     reader.close();
@@ -214,8 +262,9 @@ public class TestCacheOnWrite {
     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
         "test_cache_on_write");
     StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
-        DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
-        cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV);
+        DATA_BLOCK_SIZE, compress, encoderType.getEncoder(),
+        KeyValue.COMPARATOR, conf, cacheConf, StoreFile.BloomType.ROWCOL,
+        NUM_KV);
 
     final int rowLen = 32;
     for (int i = 0; i < NUM_KV; ++i) {
@@ -236,7 +285,6 @@ public class TestCacheOnWrite {
     storeFilePath = sfw.getPath();
   }
 
-
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Sat Dec 24 21:20:39 2011
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,16 +47,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.Compressor;
 
 import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 @Category(MediumTests.class)
+@RunWith(Parameterized.class)
 public class TestHFileBlock {
   // change this value to activate more logs
   private static final boolean detailedLogging = false;
@@ -69,14 +79,29 @@ public class TestHFileBlock {
   static final Compression.Algorithm[] GZIP_ONLY  = { GZ };
 
   private static final int NUM_TEST_BLOCKS = 1000;
-
   private static final int NUM_READER_THREADS = 26;
 
+  // Used to generate KeyValues
+  private static int NUM_KEYVALUES = 50;
+  private static int FIELD_LENGTH = 10;
+  private static float CHANCE_TO_REPEAT = 0.6f;
+
   private static final HBaseTestingUtility TEST_UTIL =
     new HBaseTestingUtility();
   private FileSystem fs;
   private int uncompressedSizeV1;
 
+  private final boolean includesMemstoreTS;
+
+  public TestHFileBlock(boolean includesMemstoreTS) {
+    this.includesMemstoreTS = includesMemstoreTS;
+  }
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
   @Before
   public void setUp() throws IOException {
     fs = FileSystem.get(TEST_UTIL.getConfiguration());
@@ -88,6 +113,74 @@ public class TestHFileBlock {
       dos.writeInt(i / 100);
   }
 
+  private int writeTestKeyValues(OutputStream dos, int seed)
+      throws IOException {
+    List<KeyValue> keyValues = new ArrayList<KeyValue>();
+    Random randomizer = new Random(42l + seed); // just any fixed number
+
+    // generate keyValues
+    for (int i = 0 ; i < NUM_KEYVALUES ; ++i) {
+      byte[] row;
+      long timestamp;
+      byte[] family;
+      byte[] qualifier;
+      byte[] value;
+
+      // generate it or repeat, it should compress well
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
+      } else {
+        row = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(row);
+      }
+      if (0 == i) {
+        family = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(family);
+      } else {
+        family = keyValues.get(0).getFamily();
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        qualifier = keyValues.get(
+            randomizer.nextInt(keyValues.size())).getQualifier();
+      } else {
+        qualifier = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(qualifier);
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
+      } else {
+        value = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(value);
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        timestamp = keyValues.get(
+            randomizer.nextInt(keyValues.size())).getTimestamp();
+      } else {
+        timestamp = randomizer.nextLong();
+      }
+
+      keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
+    }
+
+    // sort it and write to stream
+    int totalSize = 0;
+    Collections.sort(keyValues, KeyValue.COMPARATOR);
+    DataOutputStream dataOutputStream = new DataOutputStream(dos);
+    for (KeyValue kv : keyValues) {
+      totalSize += kv.getLength();
+      dataOutputStream.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+      if (includesMemstoreTS) {
+        long memstoreTS = randomizer.nextLong();
+        WritableUtils.writeVLong(dataOutputStream, memstoreTS);
+        totalSize += WritableUtils.getVIntSize(memstoreTS);
+      }
+    }
+
+    return totalSize;
+  }
+
+
+
   public byte[] createTestV1Block(Compression.Algorithm algo)
       throws IOException {
     Compressor compressor = algo.getCompressor();
@@ -105,8 +198,9 @@ public class TestHFileBlock {
   private byte[] createTestV2Block(Compression.Algorithm algo)
       throws IOException {
     final BlockType blockType = BlockType.DATA;
-    HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
-    DataOutputStream dos = hbw.startWriting(blockType, false);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+        includesMemstoreTS);
+    DataOutputStream dos = hbw.startWriting(blockType);
     writeTestBlockContents(dos);
     byte[] headerAndData = hbw.getHeaderAndData();
     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
@@ -194,10 +288,11 @@ public class TestHFileBlock {
         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
             + algo);
         FSDataOutputStream os = fs.create(path);
-        HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+        HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+            includesMemstoreTS);
         long totalSize = 0;
         for (int blockId = 0; blockId < 2; ++blockId) {
-          DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
           for (int i = 0; i < 1234; ++i)
             dos.writeInt(i);
           hbw.writeHeaderAndData(os);
@@ -240,6 +335,97 @@ public class TestHFileBlock {
     }
   }
 
+  /**
+   * Test encoding/decoding data blocks.
+   * @throws IOException a bug or a problem with temporary files.
+   */
+  @Test
+  public void testDataBlockEncoding() throws IOException {
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+        for (DataBlockEncodings.Algorithm dataBlockEncoderAlgo :
+            DataBlockEncodings.Algorithm.values()) {
+          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+              + algo + "_" + dataBlockEncoderAlgo.toString());
+          FSDataOutputStream os = fs.create(path);
+          HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+              dataBlockEncoderAlgo,
+              DataBlockEncodings.Algorithm.NONE,
+              HFileDataBlockEncoderImpl.NO_ENCODED_SEEK);
+          HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder,
+              includesMemstoreTS);
+          long totalSize = 0;
+          List<Integer> blockSizes = new ArrayList<Integer>();
+          List<ByteBuffer> blockContent = new ArrayList<ByteBuffer>();
+          for (int blockId = 0; blockId < 2; ++blockId) {
+            DataOutputStream dos = hbw.startWriting(BlockType.DATA);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DoubleOutputStream doubleOutputStream =
+                new DoubleOutputStream(dos, baos);
+
+            blockSizes.add(writeTestKeyValues(doubleOutputStream, blockId));
+
+            ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+            buf.rewind();
+            blockContent.add(buf);
+
+            hbw.writeHeaderAndData(os);
+            totalSize += hbw.getOnDiskSizeWithHeader();
+          }
+          os.close();
+
+          FSDataInputStream is = fs.open(path);
+          HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, algo,
+              totalSize, dataBlockEncoder);
+          hbr.setIncludesMemstoreTS(includesMemstoreTS);
+
+          HFileBlock b;
+          int pos = 0;
+          for (int blockId = 0; blockId < 2; ++blockId) {
+            b = hbr.readBlockData(pos, -1, -1, pread);
+            b.sanityCheck();
+            pos += b.getOnDiskSizeWithHeader();
+
+            assertEquals((int) blockSizes.get(blockId),
+                b.getUncompressedSizeWithoutHeader());
+            ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+            ByteBuffer expectedBuffer = blockContent.get(blockId);
+            expectedBuffer.rewind();
+
+            // test if content matches, produce nice message
+            if (!actualBuffer.equals(expectedBuffer)) {
+              int prefix = 0;
+              while (prefix < expectedBuffer.limit() &&
+                  expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
+                prefix++;
+              }
+
+              int kvCount = 0;
+
+              while (actualBuffer.position() + 2 * Bytes.SIZEOF_INT <
+                  actualBuffer.limit()) {
+                int keyLength = actualBuffer.getInt();
+                int valueLength = actualBuffer.getInt();
+                kvCount++;
+                actualBuffer.position(actualBuffer.position() +
+                    keyLength + valueLength);
+              }
+
+              fail(String.format(
+                  "Content mismath compression: %s encoding: %s" +
+                  " pread: %s commonPrefix: %d kvCount: %d" +
+                  " expected char: %s actual char %s", algo.getName(),
+                  dataBlockEncoderAlgo.toString(), pread, prefix, kvCount,
+                  expectedBuffer.get(prefix),
+                  actualBuffer.get(prefix)));
+            }
+          }
+          is.close();
+        }
+      }
+    }
+  }
+
   @Test
   public void testPreviousOffset() throws IOException {
     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
@@ -446,13 +632,17 @@ public class TestHFileBlock {
   ) throws IOException {
     boolean cacheOnWrite = expectedContents != null;
     FSDataOutputStream os = fs.create(path);
-    HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null,
+        includesMemstoreTS);
     Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
     long totalSize = 0;
     for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
       int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
+        blockTypeOrdinal = BlockType.DATA.ordinal();
+      }
       BlockType bt = BlockType.values()[blockTypeOrdinal];
-      DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+      DataOutputStream dos = hbw.startWriting(bt);
       for (int j = 0; j < rand.nextInt(500); ++j) {
         // This might compress well.
         dos.writeShort(i + 1);
@@ -501,7 +691,7 @@ public class TestHFileBlock {
       byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
       ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-          true, -1);
+          HFileBlock.FILL_HEADER, -1, includesMemstoreTS);
       long byteBufferExpectedSize =
           ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
               + HFileBlock.HEADER_SIZE + size);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Sat Dec 24 21:20:39 2011
@@ -20,6 +20,10 @@
 
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -44,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -52,8 +55,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import static org.junit.Assert.*;
-
 @RunWith(Parameterized.class)
 @Category(MediumTests.class)
 public class TestHFileBlockIndex {
@@ -92,6 +93,8 @@ public class TestHFileBlockIndex {
   private static final int[] UNCOMPRESSED_INDEX_SIZES =
       { 19187, 21813, 23086 };
 
+  private static final boolean includesMemstoreTS = true;
+
   static {
     assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
     assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
@@ -210,13 +213,14 @@ public class TestHFileBlockIndex {
 
   private void writeWholeIndex() throws IOException {
     assertEquals(0, keys.size());
-    HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null,
+        includesMemstoreTS);
     FSDataOutputStream outputStream = fs.create(path);
     HFileBlockIndex.BlockIndexWriter biw =
         new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
 
     for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
-      hbw.startWriting(BlockType.DATA, false).write(
+      hbw.startWriting(BlockType.DATA).write(
           String.valueOf(rand.nextInt(1000)).getBytes());
       long blockOffset = outputStream.getPos();
       hbw.writeHeaderAndData(outputStream);
@@ -251,7 +255,7 @@ public class TestHFileBlockIndex {
       boolean isClosing) throws IOException {
     while (biw.shouldWriteBlock(isClosing)) {
       long offset = outputStream.getPos();
-      biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
+      biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType()));
       hbw.writeHeaderAndData(outputStream);
       biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
           hbw.getUncompressedSizeWithoutHeader());
@@ -479,7 +483,7 @@ public class TestHFileBlockIndex {
       {
         HFile.Writer writer =
           HFile.getWriterFactory(conf, cacheConf).createWriter(fs,
-            hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR);
+            hfilePath, SMALL_BLOCK_SIZE, compr, null, KeyValue.KEY_COMPARATOR);
         Random rand = new Random(19231737);
 
         for (int i = 0; i < NUM_KV; ++i) {