You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/01/26 03:59:00 UTC

svn commit: r1236031 [5/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Uses the load tester
+ */
+@Category(MediumTests.class)
+public class TestLoadAndSwitchEncodeOnDisk extends
+    TestMiniClusterLoadSequential {
+
+  /** We do not alternate the multi-put flag in this test. */
+  private static final boolean USE_MULTI_PUT = true;
+
+  /** Un-parameterize the test */
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][]{ new Object[0] });
+  }
+
+  public TestLoadAndSwitchEncodeOnDisk() {
+    super(USE_MULTI_PUT, DataBlockEncoding.PREFIX);
+    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
+  }
+
+  protected int numKeys() {
+    return 3000;
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void loadTest() throws Exception {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+
+    compression = Compression.Algorithm.GZ; // used for table setup
+    super.loadTest();
+
+    HColumnDescriptor hcd = getColumnDesc(admin);
+    System.err.println("\nDisabling encode-on-disk. Old column descriptor: " +
+        hcd + "\n");
+    admin.disableTable(TABLE);
+    hcd.setEncodeOnDisk(false);
+    admin.modifyColumn(TABLE, hcd);
+
+    System.err.println("\nRe-enabling table\n");
+    admin.enableTable(TABLE);
+
+    System.err.println("\nNew column descriptor: " +
+        getColumnDesc(admin) + "\n");
+
+    System.err.println("\nCompacting the table\n");
+    admin.majorCompact(TABLE);
+    // Wait until compaction completes
+    Threads.sleepWithoutInterrupt(5000);
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+    while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+      Threads.sleep(50);
+    }
+
+    System.err.println("\nDone with the test, shutting down the cluster\n");
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF;
+import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF_BYTES;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestUpgradeFromHFileV1ToEncoding {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestUpgradeFromHFileV1ToEncoding.class);
+
+  private static final String TABLE = "UpgradeTable";
+  private static final byte[] TABLE_BYTES = Bytes.toBytes(TABLE);
+
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final Configuration conf = TEST_UTIL.getConfiguration();
+
+  private static final int NUM_HFILE_V1_BATCHES = 10;
+  private static final int NUM_HFILE_V2_BATCHES = 20;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Use a small flush size to create more HFiles.
+    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+    conf.setInt(HFile.FORMAT_VERSION_KEY, 1); // Use HFile v1 initially
+    TEST_UTIL.startMiniCluster();
+    LOG.debug("Started an HFile v1 cluster");
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testUpgrade() throws Exception {
+    int numBatches = 0;
+    HTableDescriptor htd = new HTableDescriptor(TABLE);
+    HColumnDescriptor hcd = new HColumnDescriptor(CF);
+    htd.addFamily(hcd);
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(htd);
+    admin.close();
+    for (int i = 0; i < NUM_HFILE_V1_BATCHES; ++i) {
+      TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
+    }
+    TEST_UTIL.shutdownMiniHBaseCluster();
+
+    conf.setInt(HFile.FORMAT_VERSION_KEY, 2);
+    TEST_UTIL.startMiniHBaseCluster(1, 1);
+    LOG.debug("Started an HFile v2 cluster");
+    admin = new HBaseAdmin(conf);
+    htd = admin.getTableDescriptor(TABLE_BYTES);
+    hcd = htd.getFamily(CF_BYTES);
+    hcd.setDataBlockEncoding(DataBlockEncoding.PREFIX);
+    admin.disableTable(TABLE);
+    admin.modifyColumn(TABLE, hcd);
+    admin.enableTable(TABLE);
+    admin.close();
+    for (int i = 0; i < NUM_HFILE_V2_BATCHES; ++i) {
+      TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
+    }
+
+    LOG.debug("Verifying all 'batches', both HFile v1 and encoded HFile v2");
+    verifyBatches(numBatches);
+
+    LOG.debug("Doing a manual compaction");
+    admin.compact(TABLE);
+    Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+    LOG.debug("Verify all the data again");
+    verifyBatches(numBatches);
+  }
+
+  private void verifyBatches(int numBatches) throws Exception {
+    for (int i = 0; i < numBatches; ++i) {
+      TestChangingEncoding.verifyTestDataBatch(conf, TABLE, i);
+    }
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Thu Jan 26 02:58:57 2012
@@ -41,9 +41,15 @@ import org.apache.hadoop.hbase.regionser
 
 public class CacheTestUtils {
 
-  /*Just checks if heapsize grows when something is cached, and gets smaller when the same object is evicted*/
+  private static final boolean includesMemstoreTS = true;
 
-  public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize){
+  /**
+   * Just checks if heapsize grows when something is cached, and gets smaller
+   * when the same object is evicted
+   */
+
+  public static void testHeapSizeChanges(final BlockCache toBeTested,
+      final int blockSize) {
     HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
     long heapSize = ((HeapSize) toBeTested).heapSize();
     toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
@@ -316,7 +322,8 @@ public class CacheTestUtils {
 
       HFileBlock generated = new HFileBlock(BlockType.DATA,
           onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
-          prevBlockOffset, cachedBuffer, false, blockSize);
+          prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
+          blockSize, includesMemstoreTS);
 
       String strKey;
       /* No conflicting keys */

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Thu Jan 26 02:58:57 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -74,10 +74,13 @@ public class TestCacheOnWrite {
   private FileSystem fs;
   private Random rand = new Random(12983177L);
   private Path storeFilePath;
-  private Compression.Algorithm compress;
-  private CacheOnWriteType cowType;
   private BlockCache blockCache;
-  private String testName;
+  private String testDescription;
+
+  private final CacheOnWriteType cowType;
+  private final Compression.Algorithm compress;
+  private final BlockEncoderTestType encoderType;
+  private final HFileDataBlockEncoder encoder;
 
   private static final int DATA_BLOCK_SIZE = 2048;
   private static final int NUM_KV = 25000;
@@ -90,49 +93,87 @@ public class TestCacheOnWrite {
       KeyValue.Type.values().length - 2;
 
   private static enum CacheOnWriteType {
-    DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY),
-    BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
-        CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY),
-    INDEX_BLOCKS(BlockType.LEAF_INDEX,
-        CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
+    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
+        BlockType.DATA, BlockType.ENCODED_DATA),
+    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+        BlockType.BLOOM_CHUNK),
+    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+        BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
 
     private final String confKey;
-    private final BlockType inlineBlockType;
+    private final BlockType blockType1;
+    private final BlockType blockType2;
+
+    private CacheOnWriteType(String confKey, BlockType blockType) {
+      this(confKey, blockType, blockType);
+    }
 
-    private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
-      this.inlineBlockType = inlineBlockType;
+    private CacheOnWriteType(String confKey, BlockType blockType1,
+        BlockType blockType2) {
+      this.blockType1 = blockType1;
+      this.blockType2 = blockType2;
       this.confKey = confKey;
     }
 
     public boolean shouldBeCached(BlockType blockType) {
-      return blockType == inlineBlockType
-          || blockType == BlockType.INTERMEDIATE_INDEX
-          && inlineBlockType == BlockType.LEAF_INDEX;
+      return blockType == blockType1 || blockType == blockType2;
     }
 
     public void modifyConf(Configuration conf) {
-      for (CacheOnWriteType cowType : CacheOnWriteType.values())
+      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
         conf.setBoolean(cowType.confKey, cowType == this);
+      }
     }
 
   }
 
+  private static final DataBlockEncoding ENCODING_ALGO =
+      DataBlockEncoding.PREFIX;
+
+  /** Provides fancy names for three combinations of two booleans */
+  private static enum BlockEncoderTestType {
+    NO_BLOCK_ENCODING(false, false),
+    BLOCK_ENCODING_IN_CACHE_ONLY(false, true),
+    BLOCK_ENCODING_EVERYWHERE(true, true);
+
+    private final boolean encodeOnDisk;
+    private final boolean encodeInCache;
+
+    BlockEncoderTestType(boolean encodeOnDisk, boolean encodeInCache) {
+      this.encodeOnDisk = encodeOnDisk;
+      this.encodeInCache = encodeInCache;
+    }
+
+    public HFileDataBlockEncoder getEncoder() {
+      return new HFileDataBlockEncoderImpl(
+          encodeOnDisk ? ENCODING_ALGO : DataBlockEncoding.NONE,
+          encodeInCache ? ENCODING_ALGO : DataBlockEncoding.NONE);
+    }
+  }
+
   public TestCacheOnWrite(CacheOnWriteType cowType,
-      Compression.Algorithm compress) {
+      Compression.Algorithm compress, BlockEncoderTestType encoderType) {
     this.cowType = cowType;
     this.compress = compress;
-    testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
-    System.out.println(testName);
+    this.encoderType = encoderType;
+    this.encoder = encoderType.getEncoder();
+    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + 
+        ", encoderType=" + encoderType + "]";
+    System.out.println(testDescription);
   }
 
   @Parameters
   public static Collection<Object[]> getParameters() {
     List<Object[]> cowTypes = new ArrayList<Object[]>();
-    for (CacheOnWriteType cowType : CacheOnWriteType.values())
+    for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
       for (Compression.Algorithm compress :
            HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
-        cowTypes.add(new Object[] { cowType, compress });
+        for (BlockEncoderTestType encoderType :
+             BlockEncoderTestType.values()) {
+          cowTypes.add(new Object[] { cowType, compress, encoderType });
+        }
       }
+    }
     return cowTypes;
   }
 
@@ -153,7 +194,6 @@ public class TestCacheOnWrite {
     fs = FileSystem.get(conf);
     cacheConf = new CacheConfig(conf);
     blockCache = cacheConf.getBlockCache();
-    System.out.println("setUp()");
   }
 
   @After
@@ -169,29 +209,43 @@ public class TestCacheOnWrite {
   }
 
   private void readStoreFile() throws IOException {
-    HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
-        storeFilePath, cacheConf);
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs,
+        storeFilePath, cacheConf, encoder.getEncodingInCache());
     LOG.info("HFile information: " + reader);
-    HFileScanner scanner = reader.getScanner(false, false);
-    assertTrue(testName, scanner.seekTo());
+    final boolean cacheBlocks = false;
+    final boolean pread = false;
+    HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
+    assertTrue(testDescription, scanner.seekTo());
 
     long offset = 0;
     HFileBlock prevBlock = null;
     EnumMap<BlockType, Integer> blockCountByType =
         new EnumMap<BlockType, Integer>(BlockType.class);
 
+    DataBlockEncoding encodingInCache =
+        encoderType.getEncoder().getEncodingInCache();
     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
       long onDiskSize = -1;
       if (prevBlock != null) {
          onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
       }
       // Flags: don't cache the block, use pread, this is not a compaction.
+      // Also, pass null for expected block type to avoid checking it.
       HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
-          false);
-      BlockCacheKey blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
+          false, null);
+      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
+          offset, encodingInCache, block.getBlockType());
       boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
       boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
-      assertEquals(testName + " " + block, shouldBeCached, isCached);
+      if (shouldBeCached != isCached) {
+        throw new AssertionError(
+            "shouldBeCached: " + shouldBeCached+ "\n" +
+            "isCached: " + isCached + "\n" +
+            "Test description: " + testDescription + "\n" +
+            "block: " + block + "\n" +
+            "encodingInCache: " + encodingInCache + "\n" +
+            "blockCacheKey: " + blockCacheKey);
+      }
       prevBlock = block;
       offset += block.getOnDiskSizeWithHeader();
       BlockType bt = block.getBlockType();
@@ -201,8 +255,10 @@ public class TestCacheOnWrite {
 
     LOG.info("Block count by type: " + blockCountByType);
     String countByType = blockCountByType.toString();
-    assertEquals(
-        "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+    BlockType cachedDataBlockType =
+        encoderType.encodeInCache ? BlockType.ENCODED_DATA : BlockType.DATA;
+    assertEquals("{" + cachedDataBlockType
+        + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
         countByType);
 
     reader.close();
@@ -228,7 +284,7 @@ public class TestCacheOnWrite {
     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
         "test_cache_on_write");
     StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
-        DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
+        DATA_BLOCK_SIZE, compress, encoder, KeyValue.COMPARATOR, conf,
         cacheConf, BLOOM_TYPE, NUM_KV);
 
     final int rowLen = 32;
@@ -260,8 +316,9 @@ public class TestCacheOnWrite {
     final byte[] cfBytes = Bytes.toBytes(cf);
     final int maxVersions = 3;
     HRegion region = TEST_UTIL.createTestRegion(table, cf, compress,
-        BLOOM_TYPE, maxVersions, HColumnDescriptor.DEFAULT_BLOCKCACHE,
-        HFile.DEFAULT_BLOCKSIZE);
+        BLOOM_TYPE, maxVersions, HFile.DEFAULT_BLOCKSIZE,
+        encoder.getEncodingInCache(),
+        encoder.getEncodingOnDisk() != DataBlockEncoding.NONE);
     int rowIdx = 0;
     long ts = EnvironmentEdgeManager.currentTimeMillis();
     for (int iFile = 0; iFile < 5; ++iFile) {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java Thu Jan 26 02:58:57 2012
@@ -158,7 +158,7 @@ public class TestHFile extends HBaseTest
     writeRecords(writer);
     fout.close();
     FSDataInputStream fin = fs.open(ncTFile);
-    Reader reader = HFile.createReader(ncTFile, fs.open(ncTFile),
+    Reader reader = HFile.createReaderFromStream(ncTFile, fs.open(ncTFile),
       fs.getFileStatus(ncTFile).getLen(), cacheConf);
     System.out.println(cacheConf.toString());
     // Load up the index.
@@ -236,7 +236,7 @@ public class TestHFile extends HBaseTest
     writer.close();
     fout.close();
     FSDataInputStream fin = fs.open(mFile);
-    Reader reader = HFile.createReader(mFile, fs.open(mFile),
+    Reader reader = HFile.createReaderFromStream(mFile, fs.open(mFile),
         this.fs.getFileStatus(mFile).getLen(), cacheConf);
     reader.loadFileInfo();
     // No data -- this should return false.

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Thu Jan 26 02:58:57 2012
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,16 +47,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.Compressor;
 
 import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 @Category(MediumTests.class)
+@RunWith(Parameterized.class)
 public class TestHFileBlock {
   // change this value to activate more logs
   private static final boolean detailedLogging = false;
@@ -69,14 +79,29 @@ public class TestHFileBlock {
   static final Compression.Algorithm[] GZIP_ONLY  = { GZ };
 
   private static final int NUM_TEST_BLOCKS = 1000;
-
   private static final int NUM_READER_THREADS = 26;
 
+  // Used to generate KeyValues
+  private static int NUM_KEYVALUES = 50;
+  private static int FIELD_LENGTH = 10;
+  private static float CHANCE_TO_REPEAT = 0.6f;
+
   private static final HBaseTestingUtility TEST_UTIL =
     new HBaseTestingUtility();
   private FileSystem fs;
   private int uncompressedSizeV1;
 
+  private final boolean includesMemstoreTS;
+
+  public TestHFileBlock(boolean includesMemstoreTS) {
+    this.includesMemstoreTS = includesMemstoreTS;
+  }
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
   @Before
   public void setUp() throws IOException {
     fs = FileSystem.get(TEST_UTIL.getConfiguration());
@@ -88,6 +113,72 @@ public class TestHFileBlock {
       dos.writeInt(i / 100);
   }
 
+  private int writeTestKeyValues(OutputStream dos, int seed)
+      throws IOException {
+    List<KeyValue> keyValues = new ArrayList<KeyValue>();
+    Random randomizer = new Random(42l + seed); // just any fixed number
+
+    // generate keyValues
+    for (int i = 0; i < NUM_KEYVALUES; ++i) {
+      byte[] row;
+      long timestamp;
+      byte[] family;
+      byte[] qualifier;
+      byte[] value;
+
+      // generate it or repeat, it should compress well
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
+      } else {
+        row = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(row);
+      }
+      if (0 == i) {
+        family = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(family);
+      } else {
+        family = keyValues.get(0).getFamily();
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        qualifier = keyValues.get(
+            randomizer.nextInt(keyValues.size())).getQualifier();
+      } else {
+        qualifier = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(qualifier);
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
+      } else {
+        value = new byte[FIELD_LENGTH];
+        randomizer.nextBytes(value);
+      }
+      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
+        timestamp = keyValues.get(
+            randomizer.nextInt(keyValues.size())).getTimestamp();
+      } else {
+        timestamp = randomizer.nextLong();
+      }
+
+      keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
+    }
+
+    // sort it and write to stream
+    int totalSize = 0;
+    Collections.sort(keyValues, KeyValue.COMPARATOR);
+    DataOutputStream dataOutputStream = new DataOutputStream(dos);
+    for (KeyValue kv : keyValues) {
+      totalSize += kv.getLength();
+      dataOutputStream.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+      if (includesMemstoreTS) {
+        long memstoreTS = randomizer.nextLong();
+        WritableUtils.writeVLong(dataOutputStream, memstoreTS);
+        totalSize += WritableUtils.getVIntSize(memstoreTS);
+      }
+    }
+
+    return totalSize;
+  }
+
   public byte[] createTestV1Block(Compression.Algorithm algo)
       throws IOException {
     Compressor compressor = algo.getCompressor();
@@ -105,8 +196,9 @@ public class TestHFileBlock {
   private byte[] createTestV2Block(Compression.Algorithm algo)
       throws IOException {
     final BlockType blockType = BlockType.DATA;
-    HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
-    DataOutputStream dos = hbw.startWriting(blockType, false);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+        includesMemstoreTS);
+    DataOutputStream dos = hbw.startWriting(blockType);
     writeTestBlockContents(dos);
     byte[] headerAndData = hbw.getHeaderAndData();
     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
@@ -194,10 +286,11 @@ public class TestHFileBlock {
         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
             + algo);
         FSDataOutputStream os = fs.create(path);
-        HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+        HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
+            includesMemstoreTS);
         long totalSize = 0;
         for (int blockId = 0; blockId < 2; ++blockId) {
-          DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
           for (int i = 0; i < 1234; ++i)
             dos.writeInt(i);
           hbw.writeHeaderAndData(os);
@@ -240,6 +333,136 @@ public class TestHFileBlock {
     }
   }
 
+  /**
+   * Test encoding/decoding data blocks.
+   * @throws IOException a bug or a problem with temporary files.
+   */
+  @Test
+  public void testDataBlockEncoding() throws IOException {
+    final int numBlocks = 5;
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+              + algo + "_" + encoding.toString());
+          FSDataOutputStream os = fs.create(path);
+          HFileDataBlockEncoder dataBlockEncoder =
+              new HFileDataBlockEncoderImpl(encoding);
+          HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder,
+              includesMemstoreTS);
+          long totalSize = 0;
+          final List<Integer> encodedSizes = new ArrayList<Integer>();
+          final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
+          for (int blockId = 0; blockId < numBlocks; ++blockId) {
+            writeEncodedBlock(encoding, hbw, encodedSizes, encodedBlocks,
+                blockId);
+
+            hbw.writeHeaderAndData(os);
+            totalSize += hbw.getOnDiskSizeWithHeader();
+          }
+          os.close();
+
+          FSDataInputStream is = fs.open(path);
+          HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, algo,
+              totalSize);
+          hbr.setDataBlockEncoder(dataBlockEncoder);
+          hbr.setIncludesMemstoreTS(includesMemstoreTS);
+
+          HFileBlock b;
+          int pos = 0;
+          for (int blockId = 0; blockId < numBlocks; ++blockId) {
+            b = hbr.readBlockData(pos, -1, -1, pread);
+            b.sanityCheck();
+            pos += b.getOnDiskSizeWithHeader();
+
+            assertEquals((int) encodedSizes.get(blockId),
+                b.getUncompressedSizeWithoutHeader());
+            ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+            if (encoding != DataBlockEncoding.NONE) {
+              // We expect a two-byte big-endian encoding id.
+              assertEquals(0, actualBuffer.get(0));
+              assertEquals(encoding.getId(), actualBuffer.get(1));
+              actualBuffer.position(2);
+              actualBuffer = actualBuffer.slice();
+            }
+
+            ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
+            expectedBuffer.rewind();
+
+            // test if content matches, produce nice message
+            assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
+                pread);
+          }
+          is.close();
+        }
+      }
+    }
+  }
+
+  private void writeEncodedBlock(DataBlockEncoding encoding,
+      HFileBlock.Writer hbw, final List<Integer> encodedSizes,
+      final List<ByteBuffer> encodedBlocks, int blockId) throws IOException {
+    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DoubleOutputStream doubleOutputStream =
+        new DoubleOutputStream(dos, baos);
+
+    final int rawBlockSize = writeTestKeyValues(doubleOutputStream,
+        blockId);
+
+    ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
+    rawBuf.rewind();
+
+    final int encodedSize;
+    final ByteBuffer encodedBuf;
+    if (encoding == DataBlockEncoding.NONE) {
+      encodedSize = rawBlockSize;
+      encodedBuf = rawBuf;
+    } else {
+      ByteArrayOutputStream encodedOut = new ByteArrayOutputStream();
+      encoding.getEncoder().compressKeyValues(
+          new DataOutputStream(encodedOut),
+          rawBuf.duplicate(), includesMemstoreTS);
+      // We need to account for the two-byte encoding algorithm ID that
+      // comes after the 24-byte block header but before encoded KVs.
+      encodedSize = encodedOut.size() + DataBlockEncoding.ID_SIZE;
+      encodedBuf = ByteBuffer.wrap(encodedOut.toByteArray());
+    }
+    encodedSizes.add(encodedSize);
+    encodedBlocks.add(encodedBuf);
+  }
+
+  private void assertBuffersEqual(ByteBuffer expectedBuffer,
+      ByteBuffer actualBuffer, Compression.Algorithm compression,
+      DataBlockEncoding encoding, boolean pread) {
+    if (!actualBuffer.equals(expectedBuffer)) {
+      int prefix = 0;
+      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
+      while (prefix < minLimit &&
+          expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
+        prefix++;
+      }
+
+      fail(String.format(
+          "Content mismath for compression %s, encoding %s, " +
+          "pread %s, commonPrefix %d, expected %s, got %s",
+          compression, encoding, pread, prefix,
+          nextBytesToStr(expectedBuffer, prefix),
+          nextBytesToStr(actualBuffer, prefix)));
+    }
+  }
+
+  /**
+   * Convert a few next bytes in the given buffer at the given position to
+   * string. Used for error messages.
+   */
+  private static String nextBytesToStr(ByteBuffer buf, int pos) {
+    int maxBytes = buf.limit() - pos;
+    int numBytes = Math.min(16, maxBytes);
+    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
+        numBytes) + (numBytes < maxBytes ? "..." : "");
+  }
+
   @Test
   public void testPreviousOffset() throws IOException {
     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
@@ -446,13 +669,17 @@ public class TestHFileBlock {
   ) throws IOException {
     boolean cacheOnWrite = expectedContents != null;
     FSDataOutputStream os = fs.create(path);
-    HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null,
+        includesMemstoreTS);
     Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
     long totalSize = 0;
     for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
       int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
+        blockTypeOrdinal = BlockType.DATA.ordinal();
+      }
       BlockType bt = BlockType.values()[blockTypeOrdinal];
-      DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+      DataOutputStream dos = hbw.startWriting(bt);
       for (int j = 0; j < rand.nextInt(500); ++j) {
         // This might compress well.
         dos.writeShort(i + 1);
@@ -501,7 +728,7 @@ public class TestHFileBlock {
       byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
       ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-          true, -1);
+          HFileBlock.FILL_HEADER, -1, includesMemstoreTS);
       long byteBufferExpectedSize =
           ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
               + HFileBlock.HEADER_SIZE + size);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Thu Jan 26 02:58:57 2012
@@ -20,6 +20,10 @@
 
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -44,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -52,8 +55,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import static org.junit.Assert.*;
-
 @RunWith(Parameterized.class)
 @Category(MediumTests.class)
 public class TestHFileBlockIndex {
@@ -92,6 +93,8 @@ public class TestHFileBlockIndex {
   private static final int[] UNCOMPRESSED_INDEX_SIZES =
       { 19187, 21813, 23086 };
 
+  private static final boolean includesMemstoreTS = true;
+
   static {
     assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
     assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
@@ -138,7 +141,8 @@ public class TestHFileBlockIndex {
 
     @Override
     public HFileBlock readBlock(long offset, long onDiskSize,
-        boolean cacheBlock, boolean pread, boolean isCompaction)
+        boolean cacheBlock, boolean pread, boolean isCompaction,
+        BlockType expectedBlockType)
         throws IOException {
       if (offset == prevOffset && onDiskSize == prevOnDiskSize &&
           pread == prevPread) {
@@ -210,13 +214,14 @@ public class TestHFileBlockIndex {
 
   private void writeWholeIndex() throws IOException {
     assertEquals(0, keys.size());
-    HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null,
+        includesMemstoreTS);
     FSDataOutputStream outputStream = fs.create(path);
     HFileBlockIndex.BlockIndexWriter biw =
         new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
 
     for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
-      hbw.startWriting(BlockType.DATA, false).write(
+      hbw.startWriting(BlockType.DATA).write(
           String.valueOf(rand.nextInt(1000)).getBytes());
       long blockOffset = outputStream.getPos();
       hbw.writeHeaderAndData(outputStream);
@@ -251,7 +256,7 @@ public class TestHFileBlockIndex {
       boolean isClosing) throws IOException {
     while (biw.shouldWriteBlock(isClosing)) {
       long offset = outputStream.getPos();
-      biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
+      biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType()));
       hbw.writeHeaderAndData(outputStream);
       biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
           hbw.getUncompressedSizeWithoutHeader());
@@ -479,7 +484,7 @@ public class TestHFileBlockIndex {
       {
         HFile.Writer writer =
           HFile.getWriterFactory(conf, cacheConf).createWriter(fs,
-            hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR);
+            hfilePath, SMALL_BLOCK_SIZE, compr, null, KeyValue.KEY_COMPARATOR);
         Random rand = new Random(19231737);
 
         for (int i = 0; i < NUM_KV; ++i) {

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestHFileDataBlockEncoder {
+  private Configuration conf;
+  private final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private HFileDataBlockEncoderImpl blockEncoder;
+  private RedundantKVGenerator generator = new RedundantKVGenerator();
+  private SchemaConfigured UNKNOWN_TABLE_AND_CF =
+      SchemaConfigured.createUnknown();
+  private boolean includesMemstoreTS;
+
+  /**
+   * Create test for given data block encoding configuration.
+   * @param blockEncoder What kind of encoding policy will be used.
+   */
+  public TestHFileDataBlockEncoder(HFileDataBlockEncoderImpl blockEncoder,
+      boolean includesMemstoreTS) {
+    this.blockEncoder = blockEncoder;
+    this.includesMemstoreTS = includesMemstoreTS;
+    System.err.println("On-disk encoding: " + blockEncoder.getEncodingOnDisk()
+        + ", in-cache encoding: " + blockEncoder.getEncodingInCache()
+        + ", includesMemstoreTS: " + includesMemstoreTS);
+  }
+
+  /**
+   * Preparation before JUnit test.
+   */
+  @Before
+  public void setUp() {
+    conf = TEST_UTIL.getConfiguration();
+    SchemaMetrics.configureGlobally(conf);
+  }
+
+  /**
+   * Cleanup after JUnit test.
+   */
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  /**
+   * Test putting and taking out blocks into cache with different
+   * encoding options.
+   */
+  @Test
+  public void testEncodingWithCache() {
+    HFileBlock block = getSampleHFileBlock();
+    LruBlockCache blockCache =
+        new LruBlockCache(8 * 1024 * 1024, 32 * 1024);
+    HFileBlock cacheBlock = blockEncoder.diskToCacheFormat(block, false);
+    BlockCacheKey cacheKey = new BlockCacheKey("test", 0);
+    blockCache.cacheBlock(cacheKey, cacheBlock);
+
+    HeapSize heapSize = blockCache.getBlock(cacheKey, false);
+    assertTrue(heapSize instanceof HFileBlock);
+
+    HFileBlock returnedBlock = (HFileBlock) heapSize;;
+
+    if (blockEncoder.getEncodingInCache() ==
+        DataBlockEncoding.NONE) {
+      assertEquals(block.getBufferWithHeader(),
+          returnedBlock.getBufferWithHeader());
+    } else {
+      if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
+        System.out.println(blockEncoder);
+      }
+      assertEquals(BlockType.ENCODED_DATA, returnedBlock.getBlockType());
+    }
+  }
+
+  /**
+   * Test writing to disk.
+   */
+  @Test
+  public void testEncodingWritePath() {
+    // usually we have just block without headers, but don't complicate that
+    HFileBlock block = getSampleHFileBlock();
+    Pair<ByteBuffer, BlockType> result =
+        blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
+            includesMemstoreTS);
+
+    int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE;
+    HFileBlock blockOnDisk = new HFileBlock(result.getSecond(),
+        size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0,
+        includesMemstoreTS);
+
+    if (blockEncoder.getEncodingOnDisk() !=
+        DataBlockEncoding.NONE) {
+      assertEquals(BlockType.ENCODED_DATA, blockOnDisk.getBlockType());
+      assertEquals(blockEncoder.getEncodingOnDisk().getId(),
+          blockOnDisk.getDataBlockEncodingId());
+    } else {
+      assertEquals(BlockType.DATA, blockOnDisk.getBlockType());
+    }
+  }
+
+  /**
+   * Test converting blocks from disk to cache format.
+   */
+  @Test
+  public void testEncodingReadPath() {
+    HFileBlock origBlock = getSampleHFileBlock();
+    blockEncoder.diskToCacheFormat(origBlock, false);
+  }
+
+  private HFileBlock getSampleHFileBlock() {
+    ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
+        generator.generateTestKeyValues(60), includesMemstoreTS);
+    int size = keyValues.limit();
+    ByteBuffer buf = ByteBuffer.allocate(size + HFileBlock.HEADER_SIZE);
+    buf.position(HFileBlock.HEADER_SIZE);
+    keyValues.rewind();
+    buf.put(keyValues);
+    HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+        HFileBlock.FILL_HEADER, 0, includesMemstoreTS);
+    UNKNOWN_TABLE_AND_CF.passSchemaMetricsTo(b);
+    return b;
+  }
+
+  /**
+   * @return All possible data block encoding configurations
+   */
+  @Parameters
+  public static Collection<Object[]> getAllConfigurations() {
+    List<Object[]> configurations =
+        new ArrayList<Object[]>();
+
+    for (DataBlockEncoding diskAlgo : DataBlockEncoding.values()) {
+      for (DataBlockEncoding cacheAlgo : DataBlockEncoding.values()) {
+        if (diskAlgo != cacheAlgo && diskAlgo != DataBlockEncoding.NONE) {
+          // We allow (1) the same encoding on disk and in cache, and
+          // (2) some encoding in cache but no encoding on disk (for testing).
+          continue;
+        }
+        for (boolean includesMemstoreTS : new boolean[] {false, true}) {
+          configurations.add(new Object[] {
+              new HFileDataBlockEncoderImpl(diskAlgo, cacheAlgo),
+              new Boolean(includesMemstoreTS)});
+        }
+      }
+    }
+
+    return configurations;
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java Thu Jan 26 02:58:57 2012
@@ -166,7 +166,7 @@ public class TestHFilePerformance extend
              minBlockSize, codecName, null);
 
         // Writing value in one shot.
-        for (long l=0 ; l<rows ; l++ ) {
+        for (long l=0; l<rows; l++ ) {
           generator.getKey(key);
           generator.getValue(value);
           writer.append(key, value);
@@ -195,7 +195,7 @@ public class TestHFilePerformance extend
 
         BytesWritable keyBsw;
         BytesWritable valBsw;
-        for (long l=0 ; l<rows ; l++ ) {
+        for (long l=0; l<rows; l++ ) {
 
            generator.getKey(key);
            keyBsw = new BytesWritable(key);
@@ -241,7 +241,7 @@ public class TestHFilePerformance extend
     FSDataInputStream fin = fs.open(path);
 
     if ("HFile".equals(fileType)){
-        HFile.Reader reader = HFile.createReader(path, fs.open(path),
+        HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
           fs.getFileStatus(path).getLen(), new CacheConfig(conf));
         reader.loadFileInfo();
         switch (method) {
@@ -252,7 +252,7 @@ public class TestHFilePerformance extend
             {
               HFileScanner scanner = reader.getScanner(false, false);
               scanner.seekTo();
-              for (long l=0 ; l<rows ; l++ ) {
+              for (long l=0; l<rows; l++ ) {
                 key = scanner.getKey();
                 val = scanner.getValue();
                 totalBytesRead += key.limit() + val.limit();
@@ -275,7 +275,7 @@ public class TestHFilePerformance extend
         BytesWritable keyBsw = new BytesWritable();
         BytesWritable valBsw = new BytesWritable();
 
-        for (long l=0 ; l<rows ; l++ ) {
+        for (long l=0; l<rows; l++ ) {
           reader.next(keyBsw, valBsw);
           totalBytesRead += keyBsw.getSize() + valBsw.getSize();
         }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java Thu Jan 26 02:58:57 2012
@@ -167,7 +167,7 @@ public class TestHFileSeek extends TestC
     int miss = 0;
     long totalBytes = 0;
     FSDataInputStream fsdis = fs.open(path);
-    Reader reader = HFile.createReader(path, fsdis,
+    Reader reader = HFile.createReaderFromStream(path, fsdis,
         fs.getFileStatus(path).getLen(), new CacheConfig(conf));
     reader.loadFileInfo();
     KeySampler kSampler =

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java Thu Jan 26 02:58:57 2012
@@ -76,7 +76,7 @@ public class TestHFileWriterV2 {
 
     final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ;
     HFileWriterV2 writer = new HFileWriterV2(conf, new CacheConfig(conf), fs,
-        hfilePath, 4096, COMPRESS_ALGO, KeyValue.KEY_COMPARATOR);
+        hfilePath, 4096, COMPRESS_ALGO, null, KeyValue.KEY_COMPARATOR);
 
     long totalKeyLength = 0;
     long totalValueLength = 0;
@@ -125,10 +125,12 @@ public class TestHFileWriterV2 {
         new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
     // Comparator class name is stored in the trailer in version 2.
     RawComparator<byte []> comparator = trailer.createComparator();
-    HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
-        trailer.getNumDataIndexLevels());
-    HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
-        Bytes.BYTES_RAWCOMPARATOR, 1);
+    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
+        new HFileBlockIndex.BlockIndexReader(comparator,
+            trailer.getNumDataIndexLevels());
+    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
+        new HFileBlockIndex.BlockIndexReader(
+            Bytes.BYTES_RAWCOMPARATOR, 1);
 
     HFileBlock.BlockIterator blockIter = blockReader.blockRange(
         trailer.getLoadOnOpenDataOffset(),
@@ -146,8 +148,10 @@ public class TestHFileWriterV2 {
     // File info
     FileInfo fileInfo = new FileInfo();
     fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
-    boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0);
+    byte [] keyValueFormatVersion = fileInfo.get(
+        HFileWriterV2.KEY_VALUE_VERSION);
+    boolean includeMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) > 0;
 
     // Counters for the number of key/value pairs and the number of blocks
     int entriesRead = 0;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Thu Jan 26 02:58:57 2012
@@ -133,6 +133,8 @@ public class TestImportExport {
         5, /* versions */
         true /* keep deleted cells */,
         HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
         HColumnDescriptor.DEFAULT_IN_MEMORY,
         HColumnDescriptor.DEFAULT_BLOCKCACHE,
         HColumnDescriptor.DEFAULT_BLOCKSIZE,
@@ -179,6 +181,8 @@ public class TestImportExport {
         5, /* versions */
         true /* keep deleted cells */,
         HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
         HColumnDescriptor.DEFAULT_IN_MEMORY,
         HColumnDescriptor.DEFAULT_BLOCKCACHE,
         HColumnDescriptor.DEFAULT_BLOCKSIZE,

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java Thu Jan 26 02:58:57 2012
@@ -184,8 +184,8 @@ public class CreateRandomStoreFile {
     }
 
     StoreFile.Writer sfw = StoreFile.createWriter(fs, outputDir, blockSize,
-        compr, KeyValue.COMPARATOR, conf, new CacheConfig(conf), bloomType, 
-        numKV);
+        compr, null, KeyValue.COMPARATOR, conf, new CacheConfig(conf),
+        bloomType, numKV);
 
     rand = new Random();
     LOG.info("Writing " + numKV + " key/value pairs");

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Tests various algorithms for key compression on an existing HFile. Useful
+ * for testing, debugging and benchmarking.
+ */
+public class DataBlockEncodingTool {
+  private static final Log LOG = LogFactory.getLog(
+      DataBlockEncodingTool.class);
+
+  private static final boolean includesMemstoreTS = true;
+
+  /**
+   * How many times should benchmark run.
+   * More times means better data in terms of statistics.
+   * It has to be larger than BENCHMARK_N_OMIT.
+   */
+  public static int BENCHMARK_N_TIMES = 12;
+
+  /**
+   * How many first runs should omit benchmark.
+   * Usually it is one in order to exclude setup cost.
+   * Has to be 0 or larger.
+   */
+  public static int BENCHMARK_N_OMIT = 2;
+
+  private List<EncodedDataBlock> codecs = new ArrayList<EncodedDataBlock>();
+  private int totalPrefixLength = 0;
+  private int totalKeyLength = 0;
+  private int totalValueLength = 0;
+  private int totalKeyRedundancyLength = 0;
+
+  final private String compressionAlgorithmName;
+  final private Algorithm compressionAlgorithm;
+  final private Compressor compressor;
+  final private Decompressor decompressor;
+
+  /**
+   * @param compressionAlgorithmName What kind of algorithm should be used
+   *                                 as baseline for comparison (e.g. lzo, gz).
+   */
+  public DataBlockEncodingTool(String compressionAlgorithmName) {
+    this.compressionAlgorithmName = compressionAlgorithmName;
+    this.compressionAlgorithm = Compression.getCompressionAlgorithmByName(
+        compressionAlgorithmName);
+    this.compressor = this.compressionAlgorithm.getCompressor();
+    this.decompressor = this.compressionAlgorithm.getDecompressor();
+  }
+  /**
+   * Check statistics for given HFile for different data block encoders.
+   * @param scanner Of file which will be compressed.
+   * @param kvLimit Maximal count of KeyValue which will be processed.
+   * @throws IOException thrown if scanner is invalid
+   */
+  public void checkStatistics(final KeyValueScanner scanner, final int kvLimit)
+      throws IOException {
+    scanner.seek(KeyValue.LOWESTKEY);
+
+    KeyValue currentKv;
+
+    byte[] previousKey = null;
+    byte[] currentKey;
+
+    List<DataBlockEncoder> dataBlockEncoders =
+        DataBlockEncoding.getAllEncoders();
+
+    for (DataBlockEncoder d : dataBlockEncoders) {
+      codecs.add(new EncodedDataBlock(d, includesMemstoreTS));
+    }
+
+    int j = 0;
+    while ((currentKv = scanner.next()) != null && j < kvLimit) {
+      // Iterates through key/value pairs
+      j++;
+      currentKey = currentKv.getKey();
+      if (previousKey != null) {
+        for (int i = 0; i < previousKey.length && i < currentKey.length &&
+            previousKey[i] == currentKey[i]; ++i) {
+          totalKeyRedundancyLength++;
+        }
+      }
+
+      for (EncodedDataBlock codec : codecs) {
+        codec.addKv(currentKv);
+      }
+
+      previousKey = currentKey;
+
+      totalPrefixLength += currentKv.getLength() - currentKv.getKeyLength() -
+          currentKv.getValueLength();
+      totalKeyLength += currentKv.getKeyLength();
+      totalValueLength += currentKv.getValueLength();
+    }
+  }
+
+  /**
+   * Verify if all data block encoders are working properly.
+   * 
+   * @param scanner Of file which was compressed.
+   * @param kvLimit Maximal count of KeyValue which will be processed.
+   * @return true if all data block encoders compressed/decompressed correctly.
+   * @throws IOException thrown if scanner is invalid
+   */
+  public boolean verifyCodecs(final KeyValueScanner scanner, final int kvLimit)
+      throws IOException {
+    KeyValue currentKv;
+
+    scanner.seek(KeyValue.LOWESTKEY);
+    List<Iterator<KeyValue>> codecIterators =
+        new ArrayList<Iterator<KeyValue>>();
+    for(EncodedDataBlock codec : codecs) {
+      codecIterators.add(codec.getIterator());
+    }
+
+    int j = 0;
+    while ((currentKv = scanner.next()) != null && j < kvLimit) {
+      // Iterates through key/value pairs
+      ++j;
+      for (Iterator<KeyValue> it : codecIterators) {
+        KeyValue codecKv = it.next();
+        if (codecKv == null || 0 != Bytes.compareTo(
+            codecKv.getBuffer(), codecKv.getOffset(), codecKv.getLength(),
+            currentKv.getBuffer(), currentKv.getOffset(),
+            currentKv.getLength())) {
+          if (codecKv == null) {
+            LOG.error("There is a bug in codec " + it +
+                " it returned null KeyValue,");
+          } else {
+            int prefix = 0;
+            int limitLength = 2 * Bytes.SIZEOF_INT +
+                Math.min(codecKv.getLength(), currentKv.getLength());
+            while (prefix < limitLength &&
+                codecKv.getBuffer()[prefix + codecKv.getOffset()] ==
+                currentKv.getBuffer()[prefix + currentKv.getOffset()]) {
+              prefix++;
+            }
+
+            LOG.error("There is bug in codec " + it.toString() +
+                "\n on element " + j +
+                "\n codecKv.getKeyLength() " + codecKv.getKeyLength() +
+                "\n codecKv.getValueLength() " + codecKv.getValueLength() +
+                "\n codecKv.getLength() " + codecKv.getLength() +
+                "\n currentKv.getKeyLength() " + currentKv.getKeyLength() +
+                "\n currentKv.getValueLength() " + currentKv.getValueLength() +
+                "\n codecKv.getLength() " + currentKv.getLength() +
+                "\n currentKV rowLength " + currentKv.getRowLength() +
+                " familyName " + currentKv.getFamilyLength() +
+                " qualifier " + currentKv.getQualifierLength() +
+                "\n prefix " + prefix +
+                "\n codecKv   '" + Bytes.toStringBinary(codecKv.getBuffer(),
+                    codecKv.getOffset(), prefix) + "' diff '" +
+                    Bytes.toStringBinary(codecKv.getBuffer(),
+                        codecKv.getOffset() + prefix, codecKv.getLength() -
+                        prefix) + "'" +
+                "\n currentKv '" + Bytes.toStringBinary(
+                   currentKv.getBuffer(),
+                   currentKv.getOffset(), prefix) + "' diff '" +
+                   Bytes.toStringBinary(currentKv.getBuffer(),
+                       currentKv.getOffset() + prefix, currentKv.getLength() -
+                       prefix) + "'"
+                );
+          }
+          return false;
+        }
+      }
+    }
+
+    LOG.info("Verification was successful!");
+
+    return true;
+  }
+
+  /**
+   * Benchmark codec's speed.
+   */
+  public void benchmarkCodecs() {
+    int prevTotalSize = -1;
+    for (EncodedDataBlock codec : codecs) {
+      prevTotalSize = benchmarkEncoder(prevTotalSize, codec);
+    }
+
+    byte[] buffer = codecs.get(0).getRawKeyValues();
+
+    benchmarkDefaultCompression(prevTotalSize, buffer);
+  }
+
+  /**
+   * Benchmark compression/decompression throughput.
+   * @param previousTotalSize Total size used for verification. Use -1 if
+   *          unknown.
+   * @param codec Tested encoder.
+   * @return Size of uncompressed data.
+   */
+  private int benchmarkEncoder(int previousTotalSize, EncodedDataBlock codec) {
+    int prevTotalSize = previousTotalSize;
+    int totalSize = 0;
+
+    // decompression time
+    List<Long> durations = new ArrayList<Long>();
+    for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+      totalSize = 0;
+
+      Iterator<KeyValue> it;
+
+      it = codec.getIterator();
+
+      // count only the algorithm time, without memory allocations
+      // (expect first time)
+      final long startTime = System.nanoTime();
+      while (it.hasNext()) {
+        totalSize += it.next().getLength();
+      }
+      final long finishTime = System.nanoTime();
+      if (itTime >= BENCHMARK_N_OMIT) {
+        durations.add(finishTime - startTime);
+      }
+
+      if (prevTotalSize != -1 && prevTotalSize != totalSize) {
+        throw new IllegalStateException(String.format(
+            "Algorithm '%s' decoded data to different size", codec.toString()));
+      }
+      prevTotalSize = totalSize;
+    }
+
+    // compression time
+    List<Long> compressDurations = new ArrayList<Long>();
+    for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+      final long startTime = System.nanoTime();
+      codec.doCompressData();
+      final long finishTime = System.nanoTime();
+      if (itTime >= BENCHMARK_N_OMIT) {
+        compressDurations.add(finishTime - startTime);
+      }
+    }
+
+    System.out.println(codec.toString() + ":");
+    printBenchmarkResult(totalSize, compressDurations, false);
+    printBenchmarkResult(totalSize, durations, true);
+
+    return prevTotalSize;
+  }
+
+  private void benchmarkDefaultCompression(int totalSize, byte[] rawBuffer) {
+    benchmarkAlgorithm(compressionAlgorithm, compressor, decompressor,
+        compressionAlgorithmName.toUpperCase(), rawBuffer, 0, totalSize);
+  }
+
+  /**
+   * Check decompress performance of a given algorithm and print it.
+   * @param algorithm Compression algorithm.
+   * @param compressorCodec Compressor to be tested.
+   * @param decompressorCodec Decompressor of the same algorithm.
+   * @param name Name of algorithm.
+   * @param buffer Buffer to be compressed.
+   * @param offset Position of the beginning of the data.
+   * @param length Length of data in buffer.
+   */
+  public static void benchmarkAlgorithm(
+      Compression.Algorithm algorithm,
+      Compressor compressorCodec,
+      Decompressor decompressorCodec,
+      String name,
+      byte[] buffer, int offset, int length) {
+    System.out.println(name + ":");
+
+    // compress it
+    List<Long> compressDurations = new ArrayList<Long>();
+    ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+    OutputStream compressingStream;
+    try {
+      for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+        final long startTime = System.nanoTime();
+        compressingStream = algorithm.createCompressionStream(
+            compressedStream, compressorCodec, 0);
+        compressingStream.write(buffer, offset, length);
+        compressingStream.flush();
+        compressedStream.toByteArray();
+
+        final long finishTime = System.nanoTime();
+
+        // add time record
+        if (itTime >= BENCHMARK_N_OMIT) {
+          compressDurations.add(finishTime - startTime);
+        }
+
+        if (itTime + 1 < BENCHMARK_N_TIMES) { // not the last one
+          compressedStream.reset();
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Benchmark, or encoding algorithm '%s' cause some stream problems",
+          name), e);
+    }
+    printBenchmarkResult(length, compressDurations, false);
+
+
+    byte[] compBuffer = compressedStream.toByteArray();
+
+    // uncompress it several times and measure performance
+    List<Long> durations = new ArrayList<Long>();
+    for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
+      final long startTime = System.nanoTime();
+      byte[] newBuf = new byte[length + 1];
+
+      try {
+
+        ByteArrayInputStream downStream = new ByteArrayInputStream(compBuffer,
+            0, compBuffer.length);
+        InputStream decompressedStream = algorithm.createDecompressionStream(
+            downStream, decompressorCodec, 0);
+
+        int destOffset = 0;
+        int nextChunk;
+        while ((nextChunk = decompressedStream.available()) > 0) {
+          destOffset += decompressedStream.read(newBuf, destOffset, nextChunk);
+        }
+        decompressedStream.close();
+
+        // iterate over KeyValue
+        KeyValue kv;
+        for (int pos = 0; pos < length; pos += kv.getLength()) {
+          kv = new KeyValue(newBuf, pos);
+        }
+
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Decoding path in '%s' algorithm cause exception ", name), e);
+      }
+
+      final long finishTime = System.nanoTime();
+
+      // check correctness
+      if (0 != Bytes.compareTo(buffer, 0, length, newBuf, 0, length)) {
+        int prefix = 0;
+        for(; prefix < buffer.length && prefix < newBuf.length; ++prefix) {
+          if (buffer[prefix] != newBuf[prefix]) {
+            break;
+          }
+        }
+        throw new RuntimeException(String.format(
+            "Algorithm '%s' is corrupting the data", name));
+      }
+
+      // add time record
+      if (itTime >= BENCHMARK_N_OMIT) {
+        durations.add(finishTime - startTime);
+      }
+    }
+    printBenchmarkResult(length, durations, true);
+  }
+
+  private static void printBenchmarkResult(int totalSize,
+      List<Long> durationsInNanoSed, boolean isDecompression) {
+    long meanTime = 0;
+    for (long time : durationsInNanoSed) {
+      meanTime += time;
+    }
+    meanTime /= durationsInNanoSed.size();
+
+    long standardDev = 0;
+    for (long time : durationsInNanoSed) {
+      standardDev += (time - meanTime) * (time - meanTime);
+    }
+    standardDev = (long) Math.sqrt(standardDev / durationsInNanoSed.size());
+
+    final double million = 1000.0 * 1000.0 * 1000.0;
+    double mbPerSec = (totalSize * million) / (1024.0 * 1024.0 * meanTime);
+    double mbPerSecDev = (totalSize * million) /
+        (1024.0 * 1024.0 * (meanTime - standardDev));
+
+    System.out.println(String.format(
+        "  %s performance:%s %6.2f MB/s (+/- %.2f MB/s)",
+        isDecompression ? "Decompression" : "Compression",
+        isDecompression ? "" : "  ",
+        mbPerSec, mbPerSecDev - mbPerSec));
+  }
+
+  /**
+   * Display statistics of different compression algorithms.
+   */
+  public void displayStatistics() {
+    int totalLength = totalPrefixLength + totalKeyLength + totalValueLength;
+    compressor.reset();
+
+    for(EncodedDataBlock codec : codecs) {
+      System.out.println(codec.toString());
+      int saved = totalKeyLength + totalPrefixLength + totalValueLength
+          - codec.getSize();
+      System.out.println(
+          String.format("  Saved bytes:            %8d", saved));
+      double keyRatio = (saved * 100.0) / (totalPrefixLength + totalKeyLength);
+      double allRatio = (saved * 100.0) / totalLength;
+      System.out.println(
+          String.format("  Key compression ratio:        %.2f %%", keyRatio));
+      System.out.println(
+          String.format("  All compression ratio:        %.2f %%", allRatio));
+      int compressedSize = codec.checkCompressedSize(compressor);
+      System.out.println(
+          String.format("  %s compressed size:    %8d",
+              compressionAlgorithmName.toUpperCase(), compressedSize));
+      double lzoRatio = 100.0 * (1.0 - compressedSize / (0.0 + totalLength));
+      System.out.println(
+          String.format("  %s compression ratio:        %.2f %%",
+              compressionAlgorithmName.toUpperCase(), lzoRatio));
+    }
+
+    System.out.println(
+        String.format("Total KV prefix length:   %8d", totalPrefixLength));
+    System.out.println(
+        String.format("Total key length:         %8d", totalKeyLength));
+    System.out.println(
+        String.format("Total key redundancy:     %8d",
+            totalKeyRedundancyLength));
+    System.out.println(
+        String.format("Total value length:       %8d", totalValueLength));
+  }
+
+  /**
+   * Test a data block encoder on the given HFile. Output results to console.
+   * @param kvLimit The limit of KeyValue which will be analyzed.
+   * @param hfilePath an HFile path on the file system.
+   * @param compressionName Compression algorithm used for comparison.
+   * @param doBenchmark Run performance benchmarks.
+   * @param doVerify Verify correctness.
+   * @throws IOException When pathName is incorrect.
+   */
+  public static void testCodecs(int kvLimit, String hfilePath,
+      String compressionName, boolean doBenchmark, boolean doVerify)
+          throws IOException {
+    // create environment
+    Path path = new Path(hfilePath);
+    Configuration conf = HBaseConfiguration.create();
+    CacheConfig cacheConf = new CacheConfig(conf);
+    FileSystem fs = FileSystem.get(conf);
+    StoreFile hsf = new StoreFile(fs, path, conf, cacheConf,
+        StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
+
+    StoreFile.Reader reader = hsf.createReader();
+    reader.loadFileInfo();
+    KeyValueScanner scanner = reader.getStoreFileScanner(true, true);
+
+    // run the utilities
+    DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
+    comp.checkStatistics(scanner, kvLimit);
+    if (doVerify) {
+      comp.verifyCodecs(scanner, kvLimit);
+    }
+    if (doBenchmark) {
+      comp.benchmarkCodecs();
+    }
+    comp.displayStatistics();
+
+    // cleanup
+    scanner.close();
+    reader.close(cacheConf.shouldEvictOnClose());
+  }
+
+  private static void printUsage(Options options) {
+    System.err.println("Usage:");
+    System.err.println(String.format("./hbase %s <options>",
+        DataBlockEncodingTool.class.getName()));
+    System.err.println("Options:");
+    for (Object it : options.getOptions()) {
+      Option opt = (Option) it;
+      if (opt.hasArg()) {
+        System.err.println(String.format("-%s %s: %s", opt.getOpt(),
+            opt.getArgName(), opt.getDescription()));
+      } else {
+        System.err.println(String.format("-%s: %s", opt.getOpt(),
+            opt.getDescription()));
+      }
+    }
+  }
+
+  /**
+   * A command line interface to benchmarks.
+   * @param args Should have length at least 1 and holds the file path to HFile.
+   * @throws IOException If you specified the wrong file.
+   */
+  public static void main(final String[] args) throws IOException {
+    // set up user arguments
+    Options options = new Options();
+    options.addOption("f", true, "HFile to analyse (REQUIRED)");
+    options.getOption("f").setArgName("FILENAME");
+    options.addOption("n", true,
+        "Limit number of KeyValue which will be analysed");
+    options.getOption("n").setArgName("NUMBER");
+    options.addOption("b", false, "Measure read throughput");
+    options.addOption("c", false, "Omit corectness tests.");
+    options.addOption("a", true,
+        "What kind of compression algorithm use for comparison.");
+
+    // parse arguments
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      System.err.println("Could not parse arguments!");
+      System.exit(-1);
+      return; // avoid warning
+    }
+
+    int kvLimit = Integer.MAX_VALUE;
+    if (cmd.hasOption("n")) {
+      kvLimit = Integer.parseInt(cmd.getOptionValue("n"));
+    }
+
+    // basic argument sanity checks
+    if (!cmd.hasOption("f")) {
+      System.err.println("ERROR: Filename is required!");
+      printUsage(options);
+      System.exit(-1);
+    }
+
+    if (!(new File(cmd.getOptionValue("f"))).exists()) {
+      System.err.println(String.format("ERROR: file '%s' doesn't exist!",
+          cmd.getOptionValue("f")));
+      printUsage(options);
+      System.exit(-1);
+    }
+
+    String pathName = cmd.getOptionValue("f");
+    String compressionName = "gz";
+    if (cmd.hasOption("a")) {
+      compressionName = cmd.getOptionValue("a");
+    }
+    boolean doBenchmark = cmd.hasOption("b");
+    boolean doVerify = !cmd.hasOption("c");
+
+    testCodecs(kvLimit, pathName, compressionName, doBenchmark, doVerify);
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+
+/**
+ * Test seek performance for encoded data blocks. Read an HFile and do several
+ * random seeks.
+ */
+public class EncodedSeekPerformanceTest {
+  private static final double NANOSEC_IN_SEC = 1000.0 * 1000.0 * 1000.0;
+  private static final double BYTES_IN_MEGABYTES = 1024.0 * 1024.0;
+  /** Default number of seeks which will be used in benchmark. */
+  public static int DEFAULT_NUMBER_OF_SEEKS = 10000;
+
+  private final HBaseTestingUtility testingUtility = new HBaseTestingUtility();
+  private Configuration configuration = testingUtility.getConfiguration();
+  private CacheConfig cacheConf = new CacheConfig(configuration);
+  private Random randomizer;
+  private int numberOfSeeks;
+
+  /** Use this benchmark with default options */
+  public EncodedSeekPerformanceTest() {
+    configuration.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.5f);
+    randomizer = new Random(42l);
+    numberOfSeeks = DEFAULT_NUMBER_OF_SEEKS;
+  }
+
+  private List<KeyValue> prepareListOfTestSeeks(Path path) throws IOException {
+    List<KeyValue> allKeyValues = new ArrayList<KeyValue>();
+
+    // read all of the key values
+    StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
+        path, configuration, cacheConf, BloomType.NONE,
+        NoOpDataBlockEncoder.INSTANCE);
+
+    StoreFile.Reader reader = storeFile.createReader();
+    StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+    KeyValue current;
+
+    scanner.seek(KeyValue.LOWESTKEY);
+    while (null != (current = scanner.next())) {
+      allKeyValues.add(current);
+    }
+
+    storeFile.closeReader(cacheConf.shouldEvictOnClose());
+
+    // pick seeks by random
+    List<KeyValue> seeks = new ArrayList<KeyValue>();
+    for (int i = 0; i < numberOfSeeks; ++i) {
+      KeyValue keyValue = allKeyValues.get(
+          randomizer.nextInt(allKeyValues.size()));
+      seeks.add(keyValue);
+    }
+
+    clearBlockCache();
+
+    return seeks;
+  }
+
+  private void runTest(Path path, HFileDataBlockEncoder blockEncoder,
+      List<KeyValue> seeks) throws IOException {
+    // read all of the key values
+    StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
+        path, configuration, cacheConf, BloomType.NONE, blockEncoder);
+
+    long totalSize = 0;
+
+    StoreFile.Reader reader = storeFile.createReader();
+    StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+
+    long startReadingTime = System.nanoTime();
+    KeyValue current;
+    scanner.seek(KeyValue.LOWESTKEY);
+    while (null != (current = scanner.next())) { // just iterate it!
+      if (current.getLength() < 0) {
+        throw new IOException("Negative KV size: " + current);
+      }
+      totalSize += current.getLength();
+    }
+    long finishReadingTime = System.nanoTime();
+
+    // do seeks
+    long startSeeksTime = System.nanoTime();
+    for (KeyValue keyValue : seeks) {
+      scanner.seek(keyValue);
+      KeyValue toVerify = scanner.next();
+      if (!keyValue.equals(toVerify)) {
+        System.out.println(String.format("KeyValue doesn't match:\n" +
+            "Orig key: %s\n" +
+            "Ret key:  %s", keyValue.getKeyString(), toVerify.getKeyString()));
+        break;
+      }
+    }
+    long finishSeeksTime = System.nanoTime();
+    if (finishSeeksTime < startSeeksTime) {
+      throw new AssertionError("Finish time " + finishSeeksTime +
+          " is earlier than start time " + startSeeksTime);
+    }
+
+    // write some stats
+    double readInMbPerSec = (totalSize * NANOSEC_IN_SEC) /
+        (BYTES_IN_MEGABYTES * (finishReadingTime - startReadingTime));
+    double seeksPerSec = (seeks.size() * NANOSEC_IN_SEC) /
+        (finishSeeksTime - startSeeksTime);
+
+    storeFile.closeReader(cacheConf.shouldEvictOnClose());
+    clearBlockCache();
+
+    System.out.println(blockEncoder);
+    System.out.printf("  Read speed:       %8.2f (MB/s)\n", readInMbPerSec);
+    System.out.printf("  Seeks per second: %8.2f (#/s)\n", seeksPerSec);
+    System.out.printf("  Total KV size:    %d\n", totalSize);
+  }
+
+  /**
+   * @param path Path to the HFile which will be used.
+   * @param encoders List of encoders which will be used for tests.
+   * @throws IOException if there is a bug while reading from disk
+   */
+  public void runTests(Path path, List<HFileDataBlockEncoder> encoders)
+      throws IOException {
+    List<KeyValue> seeks = prepareListOfTestSeeks(path);
+
+    for (HFileDataBlockEncoder blockEncoder : encoders) {
+      runTest(path, blockEncoder, seeks);
+    }
+  }
+
+  /**
+   * Command line interface:
+   * @param args Takes one argument - file size.
+   * @throws IOException if there is a bug while reading from disk
+   */
+  public static void main(final String[] args) throws IOException {
+    if (args.length < 1) {
+      printUsage();
+      System.exit(-1);
+    }
+
+    Path path = new Path(args[0]);
+    List<HFileDataBlockEncoder> encoders =
+        new ArrayList<HFileDataBlockEncoder>();
+
+    encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE));
+    for (DataBlockEncoding encodingAlgo : DataBlockEncoding.values()) {
+      encoders.add(new HFileDataBlockEncoderImpl(DataBlockEncoding.NONE,
+          encodingAlgo));
+    }
+
+    EncodedSeekPerformanceTest utility = new EncodedSeekPerformanceTest();
+    utility.runTests(path, encoders);
+
+    System.exit(0);
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: one argument, name of the HFile");
+  }
+
+  private void clearBlockCache() {
+    ((LruBlockCache) cacheConf.getBlockCache()).clearCache();
+  }
+}