You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/01/26 03:59:00 UTC

svn commit: r1236031 [4/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Jan 26 02:58:57 2012
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.KeyValue.
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.Compression;
@@ -56,6 +57,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -130,6 +133,10 @@ public class StoreFile extends SchemaCon
   /** Key for timestamp of earliest-put in metadata*/
   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
 
+  /** Type of encoding used for data blocks in HFile. Stored in file info. */
+  public static final byte[] DATA_BLOCK_ENCODING =
+      Bytes.toBytes("DATA_BLOCK_ENCODING");
+
   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -148,7 +155,10 @@ public class StoreFile extends SchemaCon
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
-  // HDFS blocks distribuion information
+  // What kind of data block encoding will be used
+  private final HFileDataBlockEncoder dataBlockEncoder;
+
+  // HDFS blocks distribution information
   private HDFSBlocksDistribution hdfsBlocksDistribution;
 
   // Keys for metadata stored in backing HFile.
@@ -220,17 +230,22 @@ public class StoreFile extends SchemaCon
    *          as the Bloom filter type actually present in the HFile, because
    *          column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param dataBlockEncoder data block encoding algorithm.
    * @throws IOException When opening the reader fails.
    */
   StoreFile(final FileSystem fs,
             final Path p,
             final Configuration conf,
             final CacheConfig cacheConf,
-            final BloomType cfBloomType)
+            final BloomType cfBloomType,
+            final HFileDataBlockEncoder dataBlockEncoder)
       throws IOException {
     this.fs = fs;
     this.path = p;
     this.cacheConf = cacheConf;
+    this.dataBlockEncoder =
+        dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
+            : dataBlockEncoder;
     if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       this.referencePath = getReferredToFile(this.path);
@@ -493,9 +508,11 @@ public class StoreFile extends SchemaCon
     }
     if (isReference()) {
       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
-          this.cacheConf, this.reference);
+          this.cacheConf, this.reference,
+          dataBlockEncoder.getEncodingInCache());
     } else {
-      this.reader = new Reader(this.fs, this.path, this.cacheConf);
+      this.reader = new Reader(this.fs, this.path, this.cacheConf,
+          dataBlockEncoder.getEncodingInCache());
     }
 
     if (isSchemaConfigured()) {
@@ -514,7 +531,7 @@ public class StoreFile extends SchemaCon
       // By convention, if halfhfile, top half has a sequence number > bottom
       // half. Thats why we add one in below. Its done for case the two halves
       // are ever merged back together --rare.  Without it, on open of store,
-      // since store files are distingushed by sequence id, the one half would
+      // since store files are distinguished by sequence id, the one half would
       // subsume the other.
       this.sequenceid = Bytes.toLong(b);
       if (isReference()) {
@@ -598,11 +615,11 @@ public class StoreFile extends SchemaCon
   }
 
   /**
-   * @param evictOnClose 
+   * @param evictOnClose whether to evict blocks belonging to this file
    * @throws IOException
    */
   public synchronized void closeReader(boolean evictOnClose)
-  throws IOException {
+      throws IOException {
     if (this.reader != null) {
       this.reader.close(evictOnClose);
       this.reader = null;
@@ -677,8 +694,8 @@ public class StoreFile extends SchemaCon
   public static Writer createWriter(final FileSystem fs, final Path dir,
       final int blocksize, Configuration conf, CacheConfig cacheConf)
   throws IOException {
-    return createWriter(fs, dir, blocksize, null, null, conf, cacheConf,
-        BloomType.NONE, 0);
+    return createWriter(fs, dir, blocksize, null, NoOpDataBlockEncoder.INSTANCE,
+        null, conf, cacheConf, BloomType.NONE, 0);
   }
 
   /**
@@ -688,8 +705,9 @@ public class StoreFile extends SchemaCon
    * @param dir Path to family directory.  Makes the directory if doesn't exist.
    * Creates a file with a unique name in this directory.
    * @param blocksize
-   * @param algorithm Pass null to get default.
-   * @param c Pass null to get default.
+   * @param compressAlgo Compression algorithm. Pass null to get default.
+   * @param dataBlockEncoder Pass null to disable data block encoding.
+   * @param comparator Key-value comparator. Pass null to get default.
    * @param conf HBase system configuration. used with bloom filters
    * @param cacheConf Cache configuration and reference.
    * @param bloomType column family setting for bloom filters
@@ -698,14 +716,11 @@ public class StoreFile extends SchemaCon
    * @throws IOException
    */
   public static StoreFile.Writer createWriter(final FileSystem fs,
-                                              final Path dir,
-                                              final int blocksize,
-                                              final Compression.Algorithm algorithm,
-                                              final KeyValue.KVComparator c,
-                                              final Configuration conf,
-                                              final CacheConfig cacheConf,
-                                              BloomType bloomType,
-                                              long maxKeyCount)
+      final Path dir, final int blocksize,
+      Compression.Algorithm compressAlgo,
+      final HFileDataBlockEncoder dataBlockEncoder,
+      KeyValue.KVComparator comparator, final Configuration conf,
+      final CacheConfig cacheConf, BloomType bloomType, long maxKeyCount)
       throws IOException {
 
     if (!fs.exists(dir)) {
@@ -716,10 +731,14 @@ public class StoreFile extends SchemaCon
       bloomType = BloomType.NONE;
     }
 
-    return new Writer(fs, path, blocksize,
-        algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
-        conf, cacheConf, c == null ? KeyValue.COMPARATOR: c, bloomType,
-        maxKeyCount);
+    if (compressAlgo == null) {
+      compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    }
+    if (comparator == null) {
+      comparator = KeyValue.COMPARATOR;
+    }
+    return new Writer(fs, path, blocksize, compressAlgo, dataBlockEncoder,
+        conf, cacheConf, comparator, bloomType, maxKeyCount);
   }
 
   /**
@@ -814,6 +833,8 @@ public class StoreFile extends SchemaCon
     private KeyValue lastDeleteFamilyKV = null;
     private long deleteFamilyCnt = 0;
 
+    protected HFileDataBlockEncoder dataBlockEncoder;
+
     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
      * When flushing a memstore, we set TimeRange and use this variable to
@@ -838,13 +859,16 @@ public class StoreFile extends SchemaCon
      * @throws IOException problem writing to FS
      */
     public Writer(FileSystem fs, Path path, int blocksize,
-        Compression.Algorithm compress, final Configuration conf,
+        Compression.Algorithm compress,
+        HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
         CacheConfig cacheConf,
         final KVComparator comparator, BloomType bloomType, long maxKeys)
         throws IOException {
+      this.dataBlockEncoder = dataBlockEncoder != null ?
+          dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
       writer = HFile.getWriterFactory(conf, cacheConf).createWriter(
           fs, path, blocksize,
-          compress, comparator.getRawComparator());
+          compress, this.dataBlockEncoder, comparator.getRawComparator());
 
       this.kvComparator = comparator;
 
@@ -940,7 +964,8 @@ public class StoreFile extends SchemaCon
             newKey = false;
             break;
           default:
-            throw new IOException("Invalid Bloom filter type: " + bloomType);
+            throw new IOException("Invalid Bloom filter type: " + bloomType +
+                " (ROW or ROWCOL expected)");
           }
         }
         if (newKey) {
@@ -1081,6 +1106,9 @@ public class StoreFile extends SchemaCon
     }
 
     public void close() throws IOException {
+      // Save data block encoder metadata in the file info.
+      dataBlockEncoder.saveMetadata(this);
+
       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
 
@@ -1119,10 +1147,11 @@ public class StoreFile extends SchemaCon
     private byte[] lastBloomKey;
     private long deleteFamilyCnt = -1;
 
-    public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
-        throws IOException {
+    public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
+        DataBlockEncoding preferredEncodingInCache) throws IOException {
       super(path);
-      reader = HFile.createReader(fs, path, cacheConf);
+      reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
+          preferredEncodingInCache);
       bloomFilterType = BloomType.NONE;
     }
 
@@ -1262,7 +1291,7 @@ public class StoreFile extends SchemaCon
 
         default:
           return true;
-      }      
+      }
     }
 
     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
@@ -1312,7 +1341,7 @@ public class StoreFile extends SchemaCon
         return true;
 
       byte[] key;
-      switch (bloomFilterType) { 
+      switch (bloomFilterType) {
         case ROW:
           if (col != null) {
             throw new RuntimeException("Row-only Bloom filter called with " +

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Thu Jan 26 02:58:57 2012
@@ -154,8 +154,8 @@ class StoreFileScanner implements KeyVal
       } finally {
         realSeekDone = true;
       }
-    } catch(IOException ioe) {
-      throw new IOException("Could not seek " + this, ioe);
+    } catch (IOException ioe) {
+      throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
   }
 
@@ -175,7 +175,8 @@ class StoreFileScanner implements KeyVal
         realSeekDone = true;
       }
     } catch (IOException ioe) {
-      throw new IOException("Could not seek " + this, ioe);
+      throw new IOException("Could not reseek " + this + " to key " + key,
+          ioe);
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Jan 26 02:58:57 2012
@@ -138,8 +138,8 @@ class StoreScanner extends NonLazyKeyVal
    * @param store who we scan
    * @param scan the spec
    * @param scanners ancillary scanners
-   * @param smallestReadPoint the readPoint that we should use for tracking versions
-   * @param retainDeletesInOutput should we retain deletes after compaction?
+   * @param smallestReadPoint the readPoint that we should use for tracking
+   *          versions
    */
   StoreScanner(Store store, Scan scan,
       List<? extends KeyValueScanner> scanners, ScanType scanType,

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Thu Jan 26 02:58:57 2012
@@ -83,6 +83,15 @@ public class SchemaConfigured implements
   }
 
   /**
+   * Creates an instance corresponding to an unknown table and column family.
+   * Used in unit tests. 
+   */
+  public static SchemaConfigured createUnknown() {
+    return new SchemaConfigured(null, SchemaMetrics.UNKNOWN,
+        SchemaMetrics.UNKNOWN);
+  }
+
+  /**
    * Default constructor. Only use when column/family name are not known at
    * construction (i.e. for HFile blocks).
    */

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.
+    EncoderBufferTooSmallException;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Utility functions for working with byte buffers, such as reading/writing
+ * variable-length long numbers.
+ */
+public final class ByteBufferUtils {
+
+  // "Compressed integer" serialization helper constants.
+  private final static int VALUE_MASK = 0x7f;
+  private final static int NEXT_BIT_SHIFT = 7;
+  private final static int NEXT_BIT_MASK = 1 << 7;
+
+  private ByteBufferUtils() {
+  }
+
+  /**
+   * Similar to {@link WritableUtils#writeVLong(java.io.DataOutput, long)},
+   * but writes to a {@link ByteBuffer}.
+   */
+  public static void writeVLong(ByteBuffer out, long i) {
+    if (i >= -112 && i <= 127) {
+      out.put((byte) i);
+      return;
+    }
+
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement
+      len = -120;
+    }
+
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    out.put((byte) len);
+
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      out.put((byte) ((i & mask) >> shiftbits));
+    }
+  }
+
+  /**
+   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
+   * {@link ByteBuffer}.
+   */
+  public static long readVLong(ByteBuffer in) {
+    byte firstByte = in.get();
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = in.get();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+
+  /**
+   * Put in buffer integer using 7 bit encoding. For each written byte:
+   * 7 bits are used to store value
+   * 1 bit is used to indicate whether there is next bit.
+   * @param value Int to be compressed.
+   * @param out Where to put compressed data
+   * @return Number of bytes written.
+   * @throws IOException on stream error
+   */
+   public static int putCompressedInt(OutputStream out, final int value)
+      throws IOException {
+    int i = 0;
+    int tmpvalue = value;
+    do {
+      byte b = (byte) (tmpvalue & VALUE_MASK);
+      tmpvalue >>>= NEXT_BIT_SHIFT;
+      if (tmpvalue != 0) {
+        b |= (byte) NEXT_BIT_MASK;
+      }
+      out.write(b);
+      i++;
+    } while (tmpvalue != 0);
+    return i;
+  }
+
+   /**
+    * Put in output stream 32 bit integer (Big Endian byte order).
+    * @param out Where to put integer.
+    * @param value Value of integer.
+    * @throws IOException On stream error.
+    */
+   public static void putInt(OutputStream out, final int value)
+       throws IOException {
+     for (int i = Bytes.SIZEOF_INT - 1; i >= 0; --i) {
+       out.write((byte) (value >>> (i * 8)));
+     }
+   }
+
+  /**
+   * Copy the data to the output stream and update position in buffer.
+   * @param out the stream to write bytes to
+   * @param in the buffer to read bytes from
+   * @param length the number of bytes to copy
+   */
+  public static void moveBufferToStream(OutputStream out, ByteBuffer in,
+      int length) throws IOException {
+    copyBufferToStream(out, in, in.position(), length);
+    skip(in, length);
+  }
+
+  /**
+   * Copy data from a buffer to an output stream. Does not update the position
+   * in the buffer.
+   * @param out the stream to write bytes to
+   * @param in the buffer to read bytes from
+   * @param offset the offset in the buffer (from the buffer's array offset)
+   *      to start copying bytes from
+   * @param length the number of bytes to copy
+   */
+  public static void copyBufferToStream(OutputStream out, ByteBuffer in,
+      int offset, int length) throws IOException {
+    if (in.hasArray()) {
+      out.write(in.array(), in.arrayOffset() + offset,
+          length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        out.write(in.get(offset + i));
+      }
+    }
+  }
+
+  public static int putLong(OutputStream out, final long value,
+      final int fitInBytes) throws IOException {
+    long tmpValue = value;
+    for (int i = 0; i < fitInBytes; ++i) {
+      out.write((byte) (tmpValue & 0xff));
+      tmpValue >>>= 8;
+    }
+    return fitInBytes;
+  }
+
+  /**
+   * Check how many bytes are required to store value.
+   * @param value Value which size will be tested.
+   * @return How many bytes are required to store value.
+   */
+  public static int longFitsIn(final long value) {
+    if (value < 0) {
+      return 8;
+    }
+
+    if (value < (1l << 4 * 8)) {
+      // no more than 4 bytes
+      if (value < (1l << 2 * 8)) {
+        if (value < (1l << 1 * 8)) {
+          return 1;
+        }
+        return 2;
+      }
+      if (value < (1l << 3 * 8)) {
+        return 3;
+      }
+      return 4;
+    }
+    // more than 4 bytes
+    if (value < (1l << 6 * 8)) {
+      if (value < (1l << 5 * 8)) {
+        return 5;
+      }
+      return 6;
+    }
+    if (value < (1l << 7 * 8)) {
+      return 7;
+    }
+    return 8;
+  }
+
+  /**
+   * Check how many bytes is required to store value.
+   * @param value Value which size will be tested.
+   * @return How many bytes are required to store value.
+   */
+  public static int intFitsIn(final int value) {
+    if (value < 0) {
+      return 4;
+    }
+
+    if (value < (1 << 2 * 8)) {
+      if (value < (1 << 1 * 8)) {
+        return 1;
+      }
+      return 2;
+    }
+    if (value <= (1 << 3 * 8)) {
+      return 3;
+    }
+    return 4;
+  }
+
+  /**
+   * Read integer from stream coded in 7 bits and increment position.
+   * @return the integer that has been read
+   * @throws IOException
+   */
+  public static int readCompressedInt(InputStream input)
+      throws IOException {
+    int result = 0;
+    int i = 0;
+    byte b;
+    do {
+      b = (byte) input.read();
+      result += (b & VALUE_MASK) << (NEXT_BIT_SHIFT * i);
+      i++;
+      if (i > Bytes.SIZEOF_INT + 1) {
+        throw new IllegalStateException(
+            "Corrupted compressed int (too long: " + (i + 1) + " bytes)");
+      }
+    } while (0 != (b & NEXT_BIT_MASK));
+    return result;
+  }
+
+  /**
+   * Read integer from buffer coded in 7 bits and increment position.
+   * @return Read integer.
+   */
+  public static int readCompressedInt(ByteBuffer buffer) {
+    byte b = buffer.get();
+    if ((b & NEXT_BIT_MASK) != 0) {
+      return (b & VALUE_MASK) + (readCompressedInt(buffer) << NEXT_BIT_SHIFT);
+    }
+    return b & VALUE_MASK;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   * @throws IOException
+   */
+  public static long readLong(InputStream in, final int fitInBytes)
+      throws IOException {
+    long tmpLong = 0;
+    for (int i = 0; i < fitInBytes; ++i) {
+      tmpLong |= (in.read() & 0xffl) << (8 * i);
+    }
+    return tmpLong;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   */
+  public static long readLong(ByteBuffer in, final int fitInBytes) {
+    long tmpLength = 0;
+    for (int i = 0; i < fitInBytes; ++i) {
+      tmpLength |= (in.get() & 0xffl) << (8l * i);
+    }
+    return tmpLength;
+  }
+
+  /**
+   * Asserts that there is at least the given amount of unfilled space
+   * remaining in the given buffer.
+   * @param out typically, the buffer we are writing to
+   * @param length the required space in the buffer
+   * @throws EncoderBufferTooSmallException If there are no enough bytes.
+   */
+  public static void ensureSpace(ByteBuffer out, int length)
+      throws EncoderBufferTooSmallException {
+    if (out.position() + length > out.limit()) {
+      throw new EncoderBufferTooSmallException(
+          "Buffer position=" + out.position() +
+          ", buffer limit=" + out.limit() +
+          ", length to be written=" + length);
+    }
+  }
+
+  /**
+   * Copy the given number of bytes from the given stream and put it at the
+   * current position of the given buffer, updating the position in the buffer.
+   * @param out the buffer to write data to
+   * @param in the stream to read data from
+   * @param length the number of bytes to read/write
+   */
+  public static void copyFromStreamToBuffer(ByteBuffer out,
+      DataInputStream in, int length) throws IOException {
+    if (out.hasArray()) {
+      in.readFully(out.array(), out.position() + out.arrayOffset(),
+          length);
+      skip(out, length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        out.put(in.readByte());
+      }
+    }
+  }
+
+  /**
+   * Copy from one buffer to another from given offset
+   * @param out destination buffer
+   * @param in source buffer
+   * @param sourceOffset offset in the source buffer
+   * @param length how many bytes to copy
+   */
+  public static void copyFromBufferToBuffer(ByteBuffer out,
+      ByteBuffer in, int sourceOffset, int length) {
+    if (in.hasArray() && out.hasArray()) {
+      System.arraycopy(in.array(), sourceOffset + in.arrayOffset(),
+          out.array(), out.position() +
+          out.arrayOffset(), length);
+      skip(out, length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        out.put(in.get(sourceOffset + i));
+      }
+    }
+  }
+
+  /**
+   * Find length of common prefix of two parts in the buffer
+   * @param buffer Where parts are located.
+   * @param offsetLeft Offset of the first part.
+   * @param offsetRight Offset of the second part.
+   * @param limit Maximal length of common prefix.
+   * @return Length of prefix.
+   */
+  public static int findCommonPrefix(ByteBuffer buffer, int offsetLeft,
+      int offsetRight, int limit) {
+    int prefix = 0;
+
+    for (; prefix < limit; ++prefix) {
+      if (buffer.get(offsetLeft + prefix) != buffer.get(offsetRight + prefix)) {
+        break;
+      }
+    }
+
+    return prefix;
+  }
+
+  /**
+   * Find length of common prefix in two arrays.
+   * @param left Array to be compared.
+   * @param leftOffset Offset in left array.
+   * @param leftLength Length of left array.
+   * @param right Array to be compared.
+   * @param rightArray Offset in right array.
+   * @param rightLength Length of right array.
+   */
+  public static int findCommonPrefix(
+      byte[] left, int leftOffset, int leftLength,
+      byte[] right, int rightOffset, int rightLength) {
+    int length = Math.min(leftLength, rightLength);
+    int result = 0;
+
+    while (result < length &&
+        left[leftOffset + result] == right[rightOffset + result]) {
+      result++;
+    }
+
+    return result;
+  }
+
+  /**
+   * Check whether two parts in the same buffer are equal.
+   * @param buffer In which buffer there are parts
+   * @param offsetLeft Beginning of first part.
+   * @param lengthLeft Length of the first part.
+   * @param offsetRight Beginning of the second part.
+   * @param lengthRight Length of the second part.
+   * @return
+   */
+  public static boolean arePartsEqual(ByteBuffer buffer,
+      int offsetLeft, int lengthLeft,
+      int offsetRight, int lengthRight) {
+    if (lengthLeft != lengthRight) {
+      return false;
+    }
+
+    if (buffer.hasArray()) {
+      return 0 == Bytes.compareTo(
+          buffer.array(), buffer.arrayOffset() + offsetLeft, lengthLeft,
+          buffer.array(), buffer.arrayOffset() + offsetRight, lengthRight);
+    }
+
+    for (int i = 0; i < lengthRight; ++i) {
+      if (buffer.get(offsetLeft + i) != buffer.get(offsetRight + i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Increment position in buffer.
+   * @param buffer In this buffer.
+   * @param length By that many bytes.
+   */
+  public static void skip(ByteBuffer buffer, int length) {
+    buffer.position(buffer.position() + length);
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java Thu Jan 26 02:58:57 2012
@@ -24,6 +24,7 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
@@ -95,7 +96,8 @@ public class CompoundBloomFilter extends
       try {
         // We cache the block and use a positional read.
         bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
-            index.getRootBlockDataSize(block), true, true, false);
+            index.getRootBlockDataSize(block), true, true, false,
+            BlockType.BLOOM_CHUNK);
       } catch (IOException ex) {
         // The Bloom filter is broken, turn it off.
         throw new IllegalArgumentException(

Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Thu Jan 26 02:58:57 2012
@@ -532,6 +532,8 @@ module Hbase
       family.setInMemory(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY)
       family.setTimeToLive(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::TTL])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL)
       family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
+      family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING)
+      family.setEncodeOnDisk(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::ENCODE_ON_DISK])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::ENCODE_ON_DISK)
       family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
       family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
       family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Thu Jan 26 02:58:57 2012
@@ -221,18 +221,30 @@ public abstract class HBaseTestCase exte
       final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
     HTableDescriptor htd = new HTableDescriptor(name);
     htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
-        HColumnDescriptor.DEFAULT_BLOOMFILTER,
-        HConstants.REPLICATION_SCOPE_LOCAL));
+      keepDeleted,
+      HColumnDescriptor.DEFAULT_COMPRESSION, 
+      HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+      HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
+      false, false,
+      HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+      HColumnDescriptor.DEFAULT_BLOOMFILTER,
+      HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+        keepDeleted,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
+        false, false,
         HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
-        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+        keepDeleted,
+        HColumnDescriptor.DEFAULT_COMPRESSION,
+        HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+        HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
+        false, false,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE,  ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     return htd;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Jan 26 02:58:57 2012
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -1879,9 +1881,13 @@ public class HBaseTestingUtility {
    * @return the number of regions the table was split into
    */
   public static int createPreSplitLoadTestTable(Configuration conf,
-      byte[] tableName, byte[] columnFamily) throws IOException {
+      byte[] tableName, byte[] columnFamily, Algorithm compression,
+      DataBlockEncoding dataBlockEncoding) throws IOException {
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(columnFamily));
+    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
+    hcd.setDataBlockEncoding(dataBlockEncoding);
+    hcd.setCompressionType(compression);
+    desc.addFamily(hcd);
 
     int totalNumberOfRegions = 0;
     try {
@@ -1924,15 +1930,18 @@ public class HBaseTestingUtility {
 
   public HRegion createTestRegion(String tableName, String cfName,
       Compression.Algorithm comprAlgo, BloomType bloomType, int maxVersions,
-      boolean blockCacheEnabled, int blockSize) throws IOException {
+      int blockSize, DataBlockEncoding encoding, boolean encodeOnDisk)
+      throws IOException {
     HColumnDescriptor hcd =
       new HColumnDescriptor(Bytes.toBytes(cfName), maxVersions,
           comprAlgo.getName(),
           HColumnDescriptor.DEFAULT_IN_MEMORY,
-          blockCacheEnabled,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
           HColumnDescriptor.DEFAULT_TTL,
           bloomType.toString());
     hcd.setBlocksize(HFile.DEFAULT_BLOCKSIZE);
+    hcd.setDataBlockEncoding(encoding);
+    hcd.setEncodeOnDisk(encodeOnDisk);
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(hcd);
     HRegionInfo info =

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java Thu Jan 26 02:58:57 2012
@@ -191,7 +191,8 @@ public class HFilePerformanceEvaluation 
     void setUp() throws Exception {
       writer =
         HFile.getWriterFactoryNoCache(conf).createWriter(this.fs,
-            this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null);
+            this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null,
+            null);
     }
 
     @Override
@@ -365,4 +366,4 @@ public class HFilePerformanceEvaluation 
   public static void main(String[] args) throws Exception {
     new HFilePerformanceEvaluation().runBenchmarks();
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java Thu Jan 26 02:58:57 2012
@@ -76,7 +76,7 @@ public class TestAcidGuarantees {
   public TestAcidGuarantees() {
     // Set small flush size for minicluster so we exercise reseeking scanners
     Configuration conf = HBaseConfiguration.create();
-    conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
     util = new HBaseTestingUtility(conf);
   }
   

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java Thu Jan 26 02:58:57 2012
@@ -373,6 +373,25 @@ public class TestKeyValue extends TestCa
     }
   }
 
+  public void testCreateKeyValueFromKey() {
+    KeyValue kv = new KeyValue(Bytes.toBytes("myRow"), Bytes.toBytes("myCF"),
+        Bytes.toBytes("myQualifier"), 12345L, Bytes.toBytes("myValue"));
+    int initialPadding = 10;
+    int endingPadding = 20;
+    int keyLen = kv.getKeyLength();
+    byte[] tmpArr = new byte[initialPadding + endingPadding + keyLen];
+    System.arraycopy(kv.getBuffer(), kv.getKeyOffset(), tmpArr,
+        initialPadding, keyLen);
+    KeyValue kvFromKey = KeyValue.createKeyValueFromKey(tmpArr, initialPadding,
+        keyLen);
+    assertEquals(keyLen, kvFromKey.getKeyLength());
+    assertEquals(KeyValue.ROW_OFFSET + keyLen, kvFromKey.getBuffer().length);
+    System.err.println("kv=" + kv);
+    System.err.println("kvFromKey=" + kvFromKey);
+    assertEquals(kvFromKey.toString(),
+        kv.toString().replaceAll("=[0-9]+$", "=0"));
+  }
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Thu Jan 26 02:58:57 2012
@@ -1407,7 +1407,7 @@ public class TestAdmin {
         "hbase.hregion.memstore.optionalflushcount", 2);
 
     // We flush the cache after every 8192 bytes
-    TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size",
+    TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
         8192);
 
     // Increase the amount of time between client retries

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Thu Jan 26 02:58:57 2012
@@ -157,6 +157,8 @@ public class TestFromClientSide {
          HColumnDescriptor.DEFAULT_VERSIONS,
          true,
          HColumnDescriptor.DEFAULT_COMPRESSION,
+         HColumnDescriptor.DEFAULT_ENCODE_ON_DISK,
+         HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING,
          HColumnDescriptor.DEFAULT_IN_MEMORY,
          HColumnDescriptor.DEFAULT_BLOCKCACHE,
          HColumnDescriptor.DEFAULT_BLOCKSIZE,

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java Thu Jan 26 02:58:57 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -97,8 +98,8 @@ public class TestHalfStoreFileReader {
   private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom,
       CacheConfig cacheConf)
       throws IOException {
-    final HalfStoreFileReader halfreader =
-        new HalfStoreFileReader(fs, p, cacheConf, bottom);
+    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p,
+        cacheConf, bottom, DataBlockEncoding.NONE);
     halfreader.loadFileInfo();
     final HFileScanner scanner = halfreader.getScanner(false, false);
 
@@ -135,7 +136,7 @@ public class TestHalfStoreFileReader {
 
   List<KeyValue> genSomeKeys() {
     List<KeyValue> ret = new ArrayList<KeyValue>(SIZE);
-    for (int i = 0 ; i < SIZE; i++) {
+    for (int i = 0; i < SIZE; i++) {
       KeyValue kv =
           new KeyValue(
               _b(String.format("row_%04d", i)),

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Thu Jan 26 02:58:57 2012
@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -295,6 +296,12 @@ public class TestHeapSize extends TestCa
       assertEquals(expected, actual);
     }
 
+    // SchemaConfigured
+    LOG.debug("Heap size for: " + SchemaConfigured.class.getName());
+    SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
+    assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
+        sc.heapSize());
+
     // Store Overhead
     cl = Store.class;
     actual = Store.FIXED_OVERHEAD;
@@ -313,16 +320,23 @@ public class TestHeapSize extends TestCa
       assertEquals(expected, actual);
     }
 
+    // Block cache key overhead
+    cl = BlockCacheKey.class;
+    // Passing zero length file name, because estimateBase does not handle
+    // deep overhead.
+    actual = new BlockCacheKey("", 0).heapSize();
+    expected  = ClassSize.estimateBase(cl, false);
+    if (expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+
     // Currently NOT testing Deep Overheads of many of these classes.
     // Deep overheads cover a vast majority of stuff, but will not be 100%
     // accurate because it's unclear when we're referencing stuff that's already
     // accounted for.  But we have satisfied our two core requirements.
     // Sizing is quite accurate now, and our tests will throw errors if
     // any of these classes are modified without updating overhead sizes.
-
-    SchemaConfigured sc = new SchemaConfigured(null, "myTable", "myCF");
-    assertEquals(ClassSize.estimateBase(SchemaConfigured.class, true),
-        sc.heapSize());
   }
 
   @org.junit.Rule

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/RedundantKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/RedundantKVGenerator.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/RedundantKVGenerator.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/RedundantKVGenerator.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Generate list of key values which are very useful to test data block encoding
+ * and compression.
+ */
+public class RedundantKVGenerator {
+  // row settings
+  static int DEFAULT_NUMBER_OF_ROW_PREFIXES = 10;
+  static int DEFAULT_AVERAGE_PREFIX_LENGTH = 6;
+  static int DEFAULT_PREFIX_LENGTH_VARIANCE = 3;
+  static int DEFAULT_AVERAGE_SUFFIX_LENGTH = 3;
+  static int DEFAULT_SUFFIX_LENGTH_VARIANCE = 3;
+  static int DEFAULT_NUMBER_OF_ROW = 500;
+
+  // qualifier
+  static float DEFAULT_CHANCE_FOR_SAME_QUALIFIER = 0.5f;
+  static float DEFAULT_CHANCE_FOR_SIMILIAR_QUALIFIER = 0.4f;
+  static int DEFAULT_AVERAGE_QUALIFIER_LENGTH = 9;
+  static int DEFAULT_QUALIFIER_LENGTH_VARIANCE = 3;
+
+  static int DEFAULT_COLUMN_FAMILY_LENGTH = 9;
+  static int DEFAULT_VALUE_LENGTH = 8;
+  static float DEFAULT_CHANCE_FOR_ZERO_VALUE = 0.5f;
+
+  static int DEFAULT_BASE_TIMESTAMP_DIVIDE = 1000000;
+  static int DEFAULT_TIMESTAMP_DIFF_SIZE = 100000000;
+
+  /**
+   * Default constructor, assumes all parameters from class constants.
+   */
+  public RedundantKVGenerator() {
+    this(new Random(42L),
+        DEFAULT_NUMBER_OF_ROW_PREFIXES,
+        DEFAULT_AVERAGE_PREFIX_LENGTH,
+        DEFAULT_PREFIX_LENGTH_VARIANCE,
+        DEFAULT_AVERAGE_SUFFIX_LENGTH,
+        DEFAULT_SUFFIX_LENGTH_VARIANCE,
+        DEFAULT_NUMBER_OF_ROW,
+
+        DEFAULT_CHANCE_FOR_SAME_QUALIFIER,
+        DEFAULT_CHANCE_FOR_SIMILIAR_QUALIFIER,
+        DEFAULT_AVERAGE_QUALIFIER_LENGTH,
+        DEFAULT_QUALIFIER_LENGTH_VARIANCE,
+
+        DEFAULT_COLUMN_FAMILY_LENGTH,
+        DEFAULT_VALUE_LENGTH,
+        DEFAULT_CHANCE_FOR_ZERO_VALUE,
+
+        DEFAULT_BASE_TIMESTAMP_DIVIDE,
+        DEFAULT_TIMESTAMP_DIFF_SIZE
+    );
+  }
+
+
+  /**
+   * Various configuration options for generating key values
+   * @param randomizer pick things by random
+   */
+  public RedundantKVGenerator(Random randomizer,
+      int numberOfRowPrefixes,
+      int averagePrefixLength,
+      int prefixLengthVariance,
+      int averageSuffixLength,
+      int suffixLengthVariance,
+      int numberOfRows,
+
+      float chanceForSameQualifier,
+      float chanceForSimiliarQualifier,
+      int averageQualifierLength,
+      int qualifierLengthVariance,
+
+      int columnFamilyLength,
+      int valueLength,
+      float chanceForZeroValue,
+
+      int baseTimestampDivide,
+      int timestampDiffSize
+      ) {
+    this.randomizer = randomizer;
+
+    this.numberOfRowPrefixes = numberOfRowPrefixes;
+    this.averagePrefixLength = averagePrefixLength;
+    this.prefixLengthVariance = prefixLengthVariance;
+    this.averageSuffixLength = averageSuffixLength;
+    this.suffixLengthVariance = suffixLengthVariance;
+    this.numberOfRows = numberOfRows;
+
+    this.chanceForSameQualifier = chanceForSameQualifier;
+    this.chanceForSimiliarQualifier = chanceForSimiliarQualifier;
+    this.averageQualifierLength = averageQualifierLength;
+    this.qualifierLengthVariance = qualifierLengthVariance;
+
+    this.columnFamilyLength = columnFamilyLength;
+    this.valueLength = valueLength;
+    this.chanceForZeroValue = chanceForZeroValue;
+
+    this.baseTimestampDivide = baseTimestampDivide;
+    this.timestampDiffSize = timestampDiffSize;
+  }
+
+  /** Used to generate dataset */
+  private Random randomizer;
+
+  // row settings
+  private int numberOfRowPrefixes;
+  private int averagePrefixLength = 6;
+  private int prefixLengthVariance = 3;
+  private int averageSuffixLength = 3;
+  private int suffixLengthVariance = 3;
+  private int numberOfRows = 500;
+
+  // qualifier
+  private float chanceForSameQualifier = 0.5f;
+  private float chanceForSimiliarQualifier = 0.4f;
+  private int averageQualifierLength = 9;
+  private int qualifierLengthVariance = 3;
+
+  private int columnFamilyLength = 9;
+  private int valueLength = 8;
+  private float chanceForZeroValue = 0.5f;
+
+  private int baseTimestampDivide = 1000000;
+  private int timestampDiffSize = 100000000;
+
+  private List<byte[]> generateRows() {
+    // generate prefixes
+    List<byte[]> prefixes = new ArrayList<byte[]>();
+    prefixes.add(new byte[0]);
+    for (int i = 1; i < numberOfRowPrefixes; ++i) {
+      int prefixLength = averagePrefixLength;
+      prefixLength += randomizer.nextInt(2 * prefixLengthVariance + 1) -
+          prefixLengthVariance;
+      byte[] newPrefix = new byte[prefixLength];
+      randomizer.nextBytes(newPrefix);
+      prefixes.add(newPrefix);
+    }
+
+    // generate rest of the row
+    List<byte[]> rows = new ArrayList<byte[]>();
+    for (int i = 0; i < numberOfRows; ++i) {
+      int suffixLength = averageSuffixLength;
+      suffixLength += randomizer.nextInt(2 * suffixLengthVariance + 1) -
+          suffixLengthVariance;
+      int randomPrefix = randomizer.nextInt(prefixes.size());
+      byte[] row = new byte[prefixes.get(randomPrefix).length +
+                            suffixLength];
+      rows.add(row);
+    }
+
+    return rows;
+  }
+
+  /**
+   * Generate test data useful to test encoders.
+   * @param howMany How many Key values should be generated.
+   * @return sorted list of key values
+   */
+  public List<KeyValue> generateTestKeyValues(int howMany) {
+    List<KeyValue> result = new ArrayList<KeyValue>();
+
+    List<byte[]> rows = generateRows();
+    Map<Integer, List<byte[]>> rowsToQualifier =
+        new HashMap<Integer, List<byte[]>>();
+
+    byte[] family = new byte[columnFamilyLength];
+    randomizer.nextBytes(family);
+
+    long baseTimestamp = Math.abs(randomizer.nextLong()) /
+        baseTimestampDivide;
+
+    byte[] value = new byte[valueLength];
+
+    for (int i = 0; i < howMany; ++i) {
+      long timestamp = baseTimestamp + randomizer.nextInt(
+          timestampDiffSize);
+      Integer rowId = randomizer.nextInt(rows.size());
+      byte[] row = rows.get(rowId);
+
+      // generate qualifier, sometimes it is same, sometimes similar,
+      // occasionally completely different
+      byte[] qualifier;
+      float qualifierChance = randomizer.nextFloat();
+      if (!rowsToQualifier.containsKey(rowId) ||
+          qualifierChance > chanceForSameQualifier +
+          chanceForSimiliarQualifier) {
+        int qualifierLength = averageQualifierLength;
+        qualifierLength +=
+            randomizer.nextInt(2 * qualifierLengthVariance + 1) -
+            qualifierLengthVariance;
+        qualifier = new byte[qualifierLength];
+        randomizer.nextBytes(qualifier);
+
+        // add it to map
+        if (!rowsToQualifier.containsKey(rowId)) {
+          rowsToQualifier.put(rowId, new ArrayList<byte[]>());
+        }
+        rowsToQualifier.get(rowId).add(qualifier);
+      } else if (qualifierChance > chanceForSameQualifier) {
+        // similar qualifier
+        List<byte[]> previousQualifiers = rowsToQualifier.get(rowId);
+        byte[] originalQualifier = previousQualifiers.get(
+            randomizer.nextInt(previousQualifiers.size()));
+
+        qualifier = new byte[originalQualifier.length];
+        int commonPrefix = randomizer.nextInt(qualifier.length);
+        System.arraycopy(originalQualifier, 0, qualifier, 0, commonPrefix);
+        for (int j = commonPrefix; j < qualifier.length; ++j) {
+          qualifier[j] = (byte) (randomizer.nextInt() & 0xff);
+        }
+
+        rowsToQualifier.get(rowId).add(qualifier);
+      } else {
+        // same qualifier
+        List<byte[]> previousQualifiers = rowsToQualifier.get(rowId);
+        qualifier = previousQualifiers.get(
+            randomizer.nextInt(previousQualifiers.size()));
+      }
+
+      if (randomizer.nextFloat() < chanceForZeroValue) {
+        for (int j = 0; j < value.length; ++j) {
+          value[j] = (byte) 0;
+        }
+      } else {
+        randomizer.nextBytes(value);
+      }
+
+      result.add(new KeyValue(row, family, qualifier, timestamp, value));
+    }
+
+    Collections.sort(result, KeyValue.COMPARATOR);
+
+    return result;
+  }
+
+  /**
+   * Convert list of KeyValues to byte buffer.
+   * @param keyValues list of KeyValues to be converted.
+   * @return buffer with content from key values
+   */
+  public static ByteBuffer convertKvToByteBuffer(List<KeyValue> keyValues,
+      boolean includesMemstoreTS) {
+    int totalSize = 0;
+    for (KeyValue kv : keyValues) {
+      totalSize += kv.getLength();
+      if (includesMemstoreTS) {
+        totalSize += WritableUtils.getVIntSize(kv.getMemstoreTS());
+      }
+    }
+
+    ByteBuffer result = ByteBuffer.allocate(totalSize);
+    for (KeyValue kv : keyValues) {
+      result.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
+      if (includesMemstoreTS) {
+        ByteBufferUtils.writeVLong(result, kv.getMemstoreTS());
+      }
+    }
+
+    return result;
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestBufferedDataBlockEncoder {
+
+  @Test
+  public void testEnsureSpaceForKey() {
+    BufferedDataBlockEncoder.SeekerState state =
+        new BufferedDataBlockEncoder.SeekerState();
+    for (int i = 1; i <= 65536; ++i) {
+      state.keyLength = i;
+      state.ensureSpaceForKey();
+      state.keyBuffer[state.keyLength - 1] = (byte) ((i - 1) % 0xff);
+      for (int j = 0; j < i - 1; ++j) {
+        // Check that earlier bytes were preserved as the buffer grew.
+        assertEquals((byte) (j % 0xff), state.keyBuffer[j]);
+      }
+    }
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests changing data block encoding settings of a column family.
+ */
+@Category(LargeTests.class)
+public class TestChangingEncoding {
+
+  private static final Log LOG = LogFactory.getLog(TestChangingEncoding.class);
+
+  static final String CF = "EncodingTestCF";
+  static final byte[] CF_BYTES = Bytes.toBytes(CF);
+
+  private static final int NUM_ROWS_PER_BATCH = 100;
+  private static final int NUM_COLS_PER_ROW = 20;
+
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private static final Configuration conf = TEST_UTIL.getConfiguration();
+
+  private static final int TIMEOUT_MS = 120000;
+
+  private HBaseAdmin admin;
+  private HColumnDescriptor hcd;
+
+  private String tableName;
+  private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE =
+      createEncodingsToIterate();
+
+  private static final List<DataBlockEncoding> createEncodingsToIterate() {
+    List<DataBlockEncoding> encodings = new ArrayList<DataBlockEncoding>(
+        Arrays.asList(DataBlockEncoding.values()));
+    encodings.add(DataBlockEncoding.NONE);
+    return Collections.unmodifiableList(encodings);
+  }
+
+  /** A zero-based index of the current batch of test data being written */
+  private int numBatchesWritten;
+
+  private void prepareTest(String testId) throws IOException {
+    tableName = "test_table_" + testId;
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    hcd = new HColumnDescriptor(CF);
+    htd.addFamily(hcd);
+    admin.createTable(htd);
+    numBatchesWritten = 0;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Use a small flush size to create more HFiles.
+    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    admin = new HBaseAdmin(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    admin.close();
+  }
+
+  private static byte[] getRowKey(int batchId, int i) {
+    return Bytes.toBytes("batch" + batchId + "_row" + i);
+  }
+
+  private static byte[] getQualifier(int j) {
+    return Bytes.toBytes("col" + j);
+  }
+
+  private static byte[] getValue(int batchId, int i, int j) {
+    return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
+        + "_col" + j);
+  }
+
+  static void writeTestDataBatch(Configuration conf, String tableName,
+      int batchId) throws Exception {
+    LOG.debug("Writing test data batch " + batchId);
+    HTable table = new HTable(conf, tableName);
+    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
+      Put put = new Put(getRowKey(batchId, i));
+      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
+        put.add(CF_BYTES, getQualifier(j),
+            getValue(batchId, i, j));
+        table.put(put);
+      }
+    }
+    table.close();
+  }
+
+  static void verifyTestDataBatch(Configuration conf, String tableName,
+      int batchId) throws Exception {
+    LOG.debug("Verifying test data batch " + batchId);
+    HTable table = new HTable(conf, tableName);
+    for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
+      Get get = new Get(getRowKey(batchId, i));
+      Result result = table.get(get);
+      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
+        KeyValue kv = result.getColumnLatest(CF_BYTES, getQualifier(j));
+        assertEquals(Bytes.toStringBinary(getValue(batchId, i, j)),
+            Bytes.toStringBinary(kv.getValue()));
+      }
+    }
+    table.close();
+  }
+
+  private void writeSomeNewData() throws Exception {
+    writeTestDataBatch(conf, tableName, numBatchesWritten);
+    ++numBatchesWritten;
+  }
+
+  private void verifyAllData() throws Exception {
+    for (int i = 0; i < numBatchesWritten; ++i) {
+      verifyTestDataBatch(conf, tableName, i);
+    }
+  }
+
+  private void setEncodingConf(DataBlockEncoding encoding,
+      boolean encodeOnDisk) throws IOException {
+    LOG.debug("Setting CF encoding to " + encoding + " (ordinal="
+        + encoding.ordinal() + "), encodeOnDisk=" + encodeOnDisk);
+    admin.disableTable(tableName);
+    hcd.setDataBlockEncoding(encoding);
+    hcd.setEncodeOnDisk(encodeOnDisk);
+    admin.modifyColumn(tableName, hcd);
+    admin.enableTable(tableName);
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void testChangingEncoding() throws Exception {
+    prepareTest("ChangingEncoding");
+    for (boolean encodeOnDisk : new boolean[]{false, true}) {
+      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
+        setEncodingConf(encoding, encodeOnDisk);
+        writeSomeNewData();
+        verifyAllData();
+      }
+    }
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void testChangingEncodingWithCompaction() throws Exception {
+    prepareTest("ChangingEncodingWithCompaction");
+    for (boolean encodeOnDisk : new boolean[]{false, true}) {
+      for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
+        setEncodingConf(encoding, encodeOnDisk);
+        writeSomeNewData();
+        verifyAllData();
+        compactAndWait();
+        verifyAllData();
+      }
+    }
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void testFlippingEncodeOnDisk() throws Exception {
+    prepareTest("FlippingEncodeOnDisk");
+    // The focus of this test case is to flip the "encoding on disk" flag,
+    // so we only try a couple of encodings.
+    DataBlockEncoding[] encodings = new DataBlockEncoding[] {
+        DataBlockEncoding.NONE, DataBlockEncoding.FAST_DIFF };
+    for (DataBlockEncoding encoding : encodings) {
+      boolean[] flagValues;
+      if (encoding == DataBlockEncoding.NONE) {
+        // encodeOnDisk does not matter when not using encoding.
+        flagValues =
+            new boolean[] { HColumnDescriptor.DEFAULT_ENCODE_ON_DISK };
+      } else {
+        flagValues = new boolean[] { false, true, false, true };
+      }
+      for (boolean encodeOnDisk : flagValues) {
+        setEncodingConf(encoding, encodeOnDisk);
+        writeSomeNewData();
+        verifyAllData();
+        compactAndWait();
+        verifyAllData();
+      }
+    }
+  }
+
+  private void compactAndWait() throws IOException, InterruptedException {
+    LOG.debug("Compacting table " + tableName);
+    admin.majorCompact(tableName);
+    Threads.sleepWithoutInterrupt(500);
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+    while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
+      Threads.sleep(50);
+    }
+    LOG.debug("Compaction queue size reached 0, continuing");
+  }
+
+  @Test
+  public void testCrazyRandomChanges() throws Exception {
+    prepareTest("RandomChanges");
+    Random rand = new Random(2934298742974297L);
+    for (int i = 0; i < 20; ++i) {
+      int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
+      DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
+      setEncodingConf(encoding, rand.nextBoolean());
+      writeSomeNewData();
+      verifyAllData();
+    }
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test all of the data block encoding algorithms for correctness.
+ * Most of the class generate data which will test different branches in code.
+ */
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+public class TestDataBlockEncoders {
+  static int NUMBER_OF_KV = 10000;
+  static int NUM_RANDOM_SEEKS = 10000;
+
+  private RedundantKVGenerator generator = new RedundantKVGenerator();
+  private Random randomizer = new Random(42l);
+
+  private final boolean includesMemstoreTS;
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
+  public TestDataBlockEncoders(boolean includesMemstoreTS) {
+    this.includesMemstoreTS = includesMemstoreTS;
+  }
+
+  private void testAlgorithm(ByteBuffer dataset, DataBlockEncoder encoder)
+      throws IOException {
+    // encode
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(baos);
+    encoder.compressKeyValues(dataOut, dataset, includesMemstoreTS);
+
+    // decode
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    ByteBuffer actualDataset;
+    actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS);
+
+    dataset.rewind();
+    actualDataset.rewind();
+
+    assertEquals("Encoding -> decoding gives different results for " + encoder,
+        Bytes.toStringBinary(dataset), Bytes.toStringBinary(actualDataset));
+  }
+
+  /**
+   * Test data block encoding of empty KeyValue.
+   * @throws IOException On test failure.
+   */
+  @Test
+  public void testEmptyKeyValues() throws IOException {
+    List<KeyValue> kvList = new ArrayList<KeyValue>();
+    byte[] row = new byte[0];
+    byte[] family = new byte[0];
+    byte[] qualifier = new byte[0];
+    byte[] value = new byte[0];
+    kvList.add(new KeyValue(row, family, qualifier, 0l, Type.Put, value));
+    kvList.add(new KeyValue(row, family, qualifier, 0l, Type.Put, value));
+    testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList,
+        includesMemstoreTS));
+  }
+
+  /**
+   * Test KeyValues with negative timestamp.
+   * @throws IOException On test failure.
+   */
+  @Test
+  public void testNegativeTimestamps() throws IOException {
+    List<KeyValue> kvList = new ArrayList<KeyValue>();
+    byte[] row = new byte[0];
+    byte[] family = new byte[0];
+    byte[] qualifier = new byte[0];
+    byte[] value = new byte[0];
+    kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
+    kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
+    testEncodersOnDataset(
+        RedundantKVGenerator.convertKvToByteBuffer(kvList,
+            includesMemstoreTS));
+  }
+
+  /**
+   * Test whether compression -> decompression gives the consistent results on
+   * pseudorandom sample.
+   * @throws IOException On test failure.
+   */
+  @Test
+  public void testExecutionOnSample() throws IOException {
+    testEncodersOnDataset(
+        RedundantKVGenerator.convertKvToByteBuffer(
+            generator.generateTestKeyValues(NUMBER_OF_KV),
+            includesMemstoreTS));
+  }
+
+  /**
+   * Test seeking while file is encoded.
+   */
+  @Test
+  public void testSeekingOnSample() throws IOException{
+    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
+    ByteBuffer originalBuffer =
+        RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
+            includesMemstoreTS);
+    List<DataBlockEncoder> dataBlockEncoders =
+        DataBlockEncoding.getAllEncoders();
+
+    // create all seekers
+    List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
+        new ArrayList<DataBlockEncoder.EncodedSeeker>();
+    for (DataBlockEncoder encoder : dataBlockEncoders) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dataOut = new DataOutputStream(baos);
+      encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
+      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
+      DataBlockEncoder.EncodedSeeker seeker =
+          encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
+      seeker.setCurrentBuffer(encodedBuffer);
+      encodedSeekers.add(seeker);
+    }
+
+    // test it!
+    // try a few random seeks
+    for (boolean seekBefore : new boolean[] {false, true}) {
+      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
+        int keyValueId;
+        if (!seekBefore) {
+          keyValueId = randomizer.nextInt(sampleKv.size());
+        } else {
+          keyValueId = randomizer.nextInt(sampleKv.size() - 1) + 1;
+        }
+
+        KeyValue keyValue = sampleKv.get(keyValueId);
+        checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
+      }
+    }
+
+    // check edge cases
+    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
+    for (boolean seekBefore : new boolean[] {false, true}) {
+      checkSeekingConsistency(encodedSeekers, seekBefore,
+          sampleKv.get(sampleKv.size() - 1));
+      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
+      KeyValue lastMidKv = midKv.createLastOnRowCol();
+      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
+    }
+  }
+
+  /**
+   * Test iterating on encoded buffers.
+   */
+  @Test
+  public void testNextOnSample() {
+    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
+    ByteBuffer originalBuffer =
+        RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
+            includesMemstoreTS);
+    List<DataBlockEncoder> dataBlockEncoders =
+        DataBlockEncoding.getAllEncoders();
+
+    for (DataBlockEncoder encoder : dataBlockEncoders) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dataOut = new DataOutputStream(baos);
+      try {
+        encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Bug while encoding using '%s'", encoder.toString()), e);
+      }
+
+      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
+      DataBlockEncoder.EncodedSeeker seeker =
+          encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
+      seeker.setCurrentBuffer(encodedBuffer);
+      int i = 0;
+      do {
+        KeyValue expectedKeyValue = sampleKv.get(i);
+        ByteBuffer keyValue = seeker.getKeyValueBuffer();
+        if (0 != Bytes.compareTo(
+            keyValue.array(), keyValue.arrayOffset(), keyValue.limit(),
+            expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(),
+            expectedKeyValue.getLength())) {
+
+          int commonPrefix = 0;
+          byte[] left = keyValue.array();
+          byte[] right = expectedKeyValue.getBuffer();
+          int leftOff = keyValue.arrayOffset();
+          int rightOff = expectedKeyValue.getOffset();
+          int length = Math.min(keyValue.limit(), expectedKeyValue.getLength());
+          while (commonPrefix < length &&
+              left[commonPrefix + leftOff] == right[commonPrefix + rightOff]) {
+            commonPrefix++;
+          }
+
+          fail(String.format(
+              "next() produces wrong results " +
+              "encoder: %s i: %d commonPrefix: %d" +
+              "\n expected %s\n actual      %s",
+              encoder.toString(), i, commonPrefix,
+              Bytes.toStringBinary(expectedKeyValue.getBuffer(),
+                  expectedKeyValue.getOffset(), expectedKeyValue.getLength()),
+              Bytes.toStringBinary(keyValue)));
+        }
+        i++;
+      } while (seeker.next());
+    }
+  }
+
+  /**
+   * Test whether the decompression of first key is implemented correctly.
+   */
+  @Test
+  public void testFirstKeyInBlockOnSample() {
+    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
+    ByteBuffer originalBuffer =
+        RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
+            includesMemstoreTS);
+    List<DataBlockEncoder> dataBlockEncoders =
+        DataBlockEncoding.getAllEncoders();
+
+    for (DataBlockEncoder encoder : dataBlockEncoders) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dataOut = new DataOutputStream(baos);
+      try {
+        encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Bug while encoding using '%s'", encoder.toString()), e);
+      }
+
+      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
+      ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
+      KeyValue firstKv = sampleKv.get(0);
+      if (0 != Bytes.compareTo(
+          keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.limit(),
+          firstKv.getBuffer(), firstKv.getKeyOffset(),
+          firstKv.getKeyLength())) {
+
+        int commonPrefix = 0;
+        int length = Math.min(keyBuffer.limit(), firstKv.getKeyLength());
+        while (commonPrefix < length &&
+            keyBuffer.array()[keyBuffer.arrayOffset() + commonPrefix] ==
+            firstKv.getBuffer()[firstKv.getKeyOffset() + commonPrefix]) {
+          commonPrefix++;
+        }
+        fail(String.format("Bug in '%s' commonPrefix %d",
+            encoder.toString(), commonPrefix));
+      }
+    }
+  }
+
+  private void checkSeekingConsistency(
+      List<DataBlockEncoder.EncodedSeeker> encodedSeekers, boolean seekBefore,
+      KeyValue keyValue) {
+    ByteBuffer expectedKeyValue = null;
+    ByteBuffer expectedKey = null;
+    ByteBuffer expectedValue = null;
+
+    for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
+      seeker.seekToKeyInBlock(keyValue.getBuffer(),
+          keyValue.getKeyOffset(), keyValue.getKeyLength(), seekBefore);
+      seeker.rewind();
+
+      ByteBuffer actualKeyValue = seeker.getKeyValueBuffer();
+      ByteBuffer actualKey = seeker.getKeyDeepCopy();
+      ByteBuffer actualValue = seeker.getValueShallowCopy();
+
+      if (expectedKeyValue != null) {
+        assertEquals(expectedKeyValue, actualKeyValue);
+      } else {
+        expectedKeyValue = actualKeyValue;
+      }
+
+      if (expectedKey != null) {
+        assertEquals(expectedKey, actualKey);
+      } else {
+        expectedKey = actualKey;
+      }
+
+      if (expectedValue != null) {
+        assertEquals(expectedValue, actualValue);
+      } else {
+        expectedValue = actualValue;
+      }
+    }
+  }
+
+  private void testEncodersOnDataset(ByteBuffer onDataset)
+      throws IOException{
+    List<DataBlockEncoder> dataBlockEncoders =
+        DataBlockEncoding.getAllEncoders();
+    ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
+    onDataset.rewind();
+    dataset.put(onDataset);
+    onDataset.rewind();
+    dataset.flip();
+
+    for (DataBlockEncoder encoder : dataBlockEncoders) {
+      testAlgorithm(dataset, encoder);
+
+      // ensure that dataset is unchanged
+      dataset.rewind();
+      assertEquals("Input of two methods is changed", onDataset, dataset);
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
+import org.apache.hadoop.hbase.util.MultiThreadedWriter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests encoded seekers by loading and reading values.
+ */
+@Category(SmallTests.class)
+@RunWith(Parameterized.class)
+public class TestEncodedSeekers {
+
+  private static final String TABLE_NAME = "encodedSeekersTable";
+  private static final String CF_NAME = "encodedSeekersCF";
+  private static final byte[] CF_BYTES = Bytes.toBytes(CF_NAME);
+  private static final int MAX_VERSIONS = 5;
+
+  private static final int MIN_VALUE_SIZE = 30;
+  private static final int MAX_VALUE_SIZE = 60;
+  private static final int NUM_ROWS = 1000;
+  private static final int NUM_COLS_PER_ROW = 20;
+  private static final int NUM_HFILES = 4;
+  private static final int NUM_ROWS_PER_FLUSH = NUM_ROWS / NUM_HFILES;
+
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private final DataBlockEncoding encoding;
+  private final boolean encodeOnDisk;
+
+  /** Enable when debugging */
+  private static final boolean VERBOSE = false;
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    List<Object[]> paramList = new ArrayList<Object[]>();
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      for (boolean encodeOnDisk : new boolean[]{false, true}) {
+        paramList.add(new Object[] { encoding, encodeOnDisk });
+      }
+    }
+    return paramList;
+  }
+
+  public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk) {
+    this.encoding = encoding;
+    this.encodeOnDisk = encodeOnDisk;
+  }
+
+  @Test
+  public void testEncodedSeeker() throws IOException {
+    System.err.println("Testing encoded seekers for encoding " + encoding);
+    LruBlockCache cache = (LruBlockCache)
+    new CacheConfig(testUtil.getConfiguration()).getBlockCache();
+    cache.clearCache();
+
+    HRegion region = testUtil.createTestRegion(TABLE_NAME, CF_NAME,
+        Algorithm.NONE, BloomType.NONE, MAX_VERSIONS, HFile.DEFAULT_BLOCKSIZE,
+        encoding, encodeOnDisk);
+    LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
+        MIN_VALUE_SIZE, MAX_VALUE_SIZE);
+
+    // Write
+    for (int i = 0; i < NUM_ROWS; ++i) {
+      byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
+      for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
+        Put put = new Put(key);
+        String colAsStr = String.valueOf(j);
+        byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
+        put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
+        region.put(put);
+      }
+      if (i % NUM_ROWS_PER_FLUSH == 0) {
+        region.flushcache();
+      }
+    }
+
+    for (int doneCompaction = 0; doneCompaction <= 1; ++doneCompaction) {
+      // Read
+      for (int i = 0; i < NUM_ROWS; ++i) {
+        final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
+        for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
+          if (VERBOSE) {
+            System.err.println("Reading row " + i + ", column " +  j);
+          }
+          final String qualStr = String.valueOf(j);
+          final byte[] qualBytes = Bytes.toBytes(qualStr);
+          Get get = new Get(rowKey);
+          get.addColumn(CF_BYTES, qualBytes);
+          Result result = region.get(get, null);
+          assertEquals(1, result.size());
+          assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
+              result.getValue(CF_BYTES, qualBytes)));
+        }
+      }
+
+      if (doneCompaction == 0) {
+        // Compact, then read again at the next loop iteration.
+        region.compactStores();
+      }
+    }
+
+    Map<DataBlockEncoding, Integer> encodingCounts =
+        cache.getEncodingCountsForTest();
+
+    // Ensure that compactions don't pollute the cache with unencoded blocks
+    // in case of in-cache-only encoding.
+    System.err.println("encodingCounts=" + encodingCounts);
+    assertEquals(1, encodingCounts.size());
+    DataBlockEncoding encodingInCache =
+        encodingCounts.keySet().iterator().next();
+    assertEquals(encoding, encodingInCache);
+    assertTrue(encodingCounts.get(encodingInCache) > 0);
+  }
+
+}