You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/11/29 16:58:32 UTC

hbase git commit: HBASE-17012 Handle Offheap cells in CompressedKvEncoder (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master e5dad24a9 -> 7c43a23c0


HBASE-17012 Handle Offheap cells in CompressedKvEncoder (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7c43a23c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7c43a23c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7c43a23c

Branch: refs/heads/master
Commit: 7c43a23c07d2af1c236b3153ba932234c3a80d13
Parents: e5dad24
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Nov 29 22:27:45 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Tue Nov 29 22:27:45 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  | 58 ++++++++++++++++----
 .../hadoop/hbase/io/TagCompressionContext.java  | 32 +----------
 .../apache/hadoop/hbase/io/util/Dictionary.java | 51 +++++++++++++++++
 .../regionserver/wal/SecureWALCellCodec.java    | 35 +++++++-----
 .../hbase/regionserver/wal/WALCellCodec.java    | 38 +++----------
 .../apache/hadoop/hbase/ipc/TestRpcServer.java  |  1 -
 .../wal/TestWALCellCodecWithCompression.java    | 40 ++++++++++++--
 .../hbase/wal/TestWALReaderOnSecureWAL.java     | 31 +++++++++--
 8 files changed, 189 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index d47cdab..86c7720 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience.Private;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1578,12 +1579,12 @@ public final class CellUtil {
 
   /**
    * Writes the row from the given cell to the output stream
-   * @param out The dataoutputstream to which the data has to be written
+   * @param out The outputstream to which the data has to be written
    * @param cell The cell whose contents has to be written
    * @param rlength the row length
    * @throws IOException
    */
-  public static void writeRow(DataOutputStream out, Cell cell, short rlength) throws IOException {
+  public static void writeRow(OutputStream out, Cell cell, short rlength) throws IOException {
     if (cell instanceof ByteBufferCell) {
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
         ((ByteBufferCell) cell).getRowPosition(), rlength);
@@ -1611,12 +1612,12 @@ public final class CellUtil {
 
   /**
    * Writes the family from the given cell to the output stream
-   * @param out The dataoutputstream to which the data has to be written
+   * @param out The outputstream to which the data has to be written
    * @param cell The cell whose contents has to be written
    * @param flength the family length
    * @throws IOException
    */
-  public static void writeFamily(DataOutputStream out, Cell cell, byte flength) throws IOException {
+  public static void writeFamily(OutputStream out, Cell cell, byte flength) throws IOException {
     if (cell instanceof ByteBufferCell) {
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
         ((ByteBufferCell) cell).getFamilyPosition(), flength);
@@ -1627,12 +1628,12 @@ public final class CellUtil {
 
   /**
    * Writes the qualifier from the given cell to the output stream
-   * @param out The dataoutputstream to which the data has to be written
+   * @param out The outputstream to which the data has to be written
    * @param cell The cell whose contents has to be written
    * @param qlength the qualifier length
    * @throws IOException
    */
-  public static void writeQualifier(DataOutputStream out, Cell cell, int qlength)
+  public static void writeQualifier(OutputStream out, Cell cell, int qlength)
       throws IOException {
     if (cell instanceof ByteBufferCell) {
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
@@ -1662,12 +1663,12 @@ public final class CellUtil {
 
   /**
    * Writes the value from the given cell to the output stream
-   * @param out The dataoutputstream to which the data has to be written
+   * @param out The outputstream to which the data has to be written
    * @param cell The cell whose contents has to be written
    * @param vlength the value length
    * @throws IOException
    */
-  public static void writeValue(DataOutputStream out, Cell cell, int vlength) throws IOException {
+  public static void writeValue(OutputStream out, Cell cell, int vlength) throws IOException {
     if (cell instanceof ByteBufferCell) {
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getValueByteBuffer(),
         ((ByteBufferCell) cell).getValuePosition(), vlength);
@@ -1678,12 +1679,12 @@ public final class CellUtil {
 
   /**
    * Writes the tag from the given cell to the output stream
-   * @param out The dataoutputstream to which the data has to be written
+   * @param out The outputstream to which the data has to be written
    * @param cell The cell whose contents has to be written
    * @param tagsLength the tag length
    * @throws IOException
    */
-  public static void writeTags(DataOutputStream out, Cell cell, int tagsLength) throws IOException {
+  public static void writeTags(OutputStream out, Cell cell, int tagsLength) throws IOException {
     if (cell instanceof ByteBufferCell) {
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
         ((ByteBufferCell) cell).getTagsPosition(), tagsLength);
@@ -2230,7 +2231,8 @@ public final class CellUtil {
    * @param tagCompressionContext the TagCompressionContext
    * @throws IOException can throw IOException if the compression encounters issue
    */
-  public static void compressTags(DataOutputStream out, Cell cell,
+  @InterfaceAudience.Private
+  public static void compressTags(OutputStream out, Cell cell,
       TagCompressionContext tagCompressionContext) throws IOException {
     if (cell instanceof ByteBufferCell) {
       tagCompressionContext.compressTags(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
@@ -2242,6 +2244,40 @@ public final class CellUtil {
   }
 
   @InterfaceAudience.Private
+  public static void compressRow(OutputStream out, Cell cell, Dictionary dict) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), cell.getRowLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), dict);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public static void compressFamily(OutputStream out, Cell cell, Dictionary dict)
+      throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        dict);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public static void compressQualifier(OutputStream out, Cell cell, Dictionary dict)
+      throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getQualifierArray(), cell.getQualifierOffset(),
+        cell.getQualifierLength(), dict);
+    }
+  }
+
+  @InterfaceAudience.Private
   /**
    * These cells are used in reseeks/seeks to improve the read performance.
    * They are not real cells that are returned back to the clients

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 278dfc4..fea2f0c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -70,7 +70,7 @@ public class TagCompressionContext {
     while (pos < endOffset) {
       int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
       pos += Tag.TAG_LENGTH_SIZE;
-      write(in, pos, tagLen, out);
+      Dictionary.write(out, in, pos, tagLen, tagDict);
       pos += tagLen;
     }
   }
@@ -94,7 +94,7 @@ public class TagCompressionContext {
       while (pos < endOffset) {
         int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
         pos += Tag.TAG_LENGTH_SIZE;
-        write(in, pos, tagLen, out);
+        Dictionary.write(out, in, pos, tagLen, tagDict);;
         pos += tagLen;
       }
     }
@@ -185,32 +185,4 @@ public class TagCompressionContext {
       dest.put(tagBuf);
     }
   }
-
-  private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
-    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
-    if (tagDict != null) {
-      dictIdx = tagDict.findEntry(data, offset, length);
-    }
-    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
-      out.write(Dictionary.NOT_IN_DICTIONARY);
-      StreamUtils.writeRawVInt32(out, length);
-      out.write(data, offset, length);
-    } else {
-      StreamUtils.writeShort(out, dictIdx);
-    }
-  }
-
-  private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException {
-    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
-    if (tagDict != null) {
-      dictIdx = tagDict.findEntry(data, offset, length);
-    }
-    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
-      out.write(Dictionary.NOT_IN_DICTIONARY);
-      StreamUtils.writeRawVInt32(out, length);
-      ByteBufferUtils.copyBufferToStream(out, data, offset, length);
-    } else {
-      StreamUtils.writeShort(out, dictIdx);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
index 54677da..e6384e1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.hbase.io.util;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 
 /**
  * Dictionary interface
@@ -80,4 +83,52 @@ public interface Dictionary {
    * Flushes the dictionary, empties all values.
    */
   void clear();
+
+  /**
+   * Helper methods to write the dictionary data to the OutputStream
+   * @param out the outputstream to which data needs to be written
+   * @param data the data to be written in byte[]
+   * @param offset the offset
+   * @param length length to be written
+   * @param dict the dictionary whose contents are to written
+   * @throws IOException
+   */
+  public static void write(OutputStream out, byte[] data, int offset, int length, Dictionary dict)
+      throws IOException {
+    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+    if (dict != null) {
+      dictIdx = dict.findEntry(data, offset, length);
+    }
+    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+      out.write(Dictionary.NOT_IN_DICTIONARY);
+      StreamUtils.writeRawVInt32(out, length);
+      out.write(data, offset, length);
+    } else {
+      StreamUtils.writeShort(out, dictIdx);
+    }
+  }
+
+  /**
+   * Helper methods to write the dictionary data to the OutputStream
+   * @param out the outputstream to which data needs to be written
+   * @param data the data to be written in ByteBuffer
+   * @param offset the offset
+   * @param length length to be written
+   * @param dict the dictionary whose contents are to written
+   * @throws IOException
+   */
+  public static void write(OutputStream out, ByteBuffer data, int offset, int length,
+      Dictionary dict) throws IOException {
+    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+    if (dict != null) {
+      dictIdx = dict.findEntry(data, offset, length);
+    }
+    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+      out.write(Dictionary.NOT_IN_DICTIONARY);
+      StreamUtils.writeRawVInt32(out, length);
+      ByteBufferUtils.copyBufferToStream(out, data, offset, length);
+    } else {
+      StreamUtils.writeShort(out, dictIdx);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
index 603496f..027ff11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
@@ -28,9 +28,11 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
 import org.apache.hadoop.hbase.io.crypto.Decryptor;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.crypto.Encryptor;
@@ -195,29 +197,32 @@ public class SecureWALCellCodec extends WALCellCodec {
 
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       OutputStream cout = encryptor.createEncryptionStream(baos);
-
+      ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout);
       int tlen = cell.getTagsLength();
       // Write the KeyValue infrastructure as VInts.
-      StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell));
-      StreamUtils.writeRawVInt32(cout, cell.getValueLength());
+      StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell));
+      StreamUtils.writeRawVInt32(bos, cell.getValueLength());
       // To support tags
-      StreamUtils.writeRawVInt32(cout, tlen);
+      StreamUtils.writeRawVInt32(bos, tlen);
 
       // Write row, qualifier, and family
-      StreamUtils.writeRawVInt32(cout, cell.getRowLength());
-      cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-      StreamUtils.writeRawVInt32(cout, cell.getFamilyLength());
-      cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
-      StreamUtils.writeRawVInt32(cout, cell.getQualifierLength());
-      cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+      short rowLength = cell.getRowLength();
+      StreamUtils.writeRawVInt32(bos, rowLength);
+      CellUtil.writeRow(bos, cell, rowLength);
+      byte familyLength = cell.getFamilyLength();
+      StreamUtils.writeRawVInt32(bos, familyLength);
+      CellUtil.writeFamily(bos, cell, familyLength);
+      int qualifierLength = cell.getQualifierLength();
+      StreamUtils.writeRawVInt32(bos, qualifierLength);
+      CellUtil.writeQualifier(bos, cell, qualifierLength);
       // Write the rest ie. ts, type, value and tags parts
-      StreamUtils.writeLong(cout, cell.getTimestamp());
-      cout.write(cell.getTypeByte());
-      cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+      StreamUtils.writeLong(bos, cell.getTimestamp());
+      bos.write(cell.getTypeByte());
+      CellUtil.writeValue(bos, cell, cell.getValueLength());
       if (tlen > 0) {
-        cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
+        CellUtil.writeTags(bos, cell, tlen);
       }
-      cout.close();
+      bos.close();
 
       StreamUtils.writeRawVInt32(out, baos.size());
       baos.writeTo(out);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 1a18087..baa940b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -207,45 +208,24 @@ public class WALCellCodec implements Codec {
       // To support tags
       int tagsLength = cell.getTagsLength();
       StreamUtils.writeRawVInt32(out, tagsLength);
-
-      // Write row, qualifier, and family; use dictionary
-      // compression as they're likely to have duplicates.
-      write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
-      write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-          compression.familyDict);
-      write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-          compression.qualifierDict);
-
+      CellUtil.compressRow(out, cell, compression.rowDict);
+      CellUtil.compressFamily(out, cell, compression.familyDict);
+      CellUtil.compressQualifier(out, cell, compression.qualifierDict);
       // Write timestamp, type and value as uncompressed.
       StreamUtils.writeLong(out, cell.getTimestamp());
       out.write(cell.getTypeByte());
-      out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+      CellUtil.writeValue(out, cell, cell.getValueLength());
       if (tagsLength > 0) {
         if (compression.tagCompressionContext != null) {
           // Write tags using Dictionary compression
-          compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
-              cell.getTagsOffset(), tagsLength);
+          CellUtil.compressTags(out, cell, compression.tagCompressionContext);
         } else {
           // Tag compression is disabled within the WAL compression. Just write the tags bytes as
           // it is.
-          out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+          CellUtil.writeTags(out, cell, tagsLength);
         }
       }
     }
-
-    private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
-      short dictIdx = Dictionary.NOT_IN_DICTIONARY;
-      if (dict != null) {
-        dictIdx = dict.findEntry(data, offset, length);
-      }
-      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
-        out.write(Dictionary.NOT_IN_DICTIONARY);
-        StreamUtils.writeRawVInt32(out, length);
-        out.write(data, offset, length);
-      } else {
-        StreamUtils.writeShort(out, dictIdx);
-      }
-    }
   }
 
   static class CompressedKvDecoder extends BaseDecoder {
@@ -364,9 +344,9 @@ public class WALCellCodec implements Codec {
 
   @Override
   public Encoder getEncoder(OutputStream os) {
+    os = (os instanceof ByteBufferWriter) ? os
+        : new ByteBufferWriterOutputStream(os);
     if (compression == null) {
-      os = (os instanceof ByteBufferWriter) ? os
-          : new ByteBufferWriterOutputStream(os);
       return new EnsureKvEncoder(os);
     }
     return new CompressedKvEncoder(os, compression);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
index 9f3bd94..6fd65f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
@@ -41,7 +41,6 @@ public class TestRpcServer {
 
   @Test
   public void testAllocateByteBuffToReadInto() throws Exception {
-    System.out.println(Long.MAX_VALUE);
     int maxBuffersInPool = 10;
     ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
     initPoolWithAllBuffers(pool, maxBuffersInPool);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index e834ac8..ba5bfa3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertEquals;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -46,24 +48,35 @@ public class TestWALCellCodecWithCompression {
 
   @Test
   public void testEncodeDecodeKVsWithTags() throws Exception {
-    doTest(false);
+    doTest(false, false);
   }
 
   @Test
   public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
-    doTest(true);
+    doTest(true, false);
   }
 
-  private void doTest(boolean compressTags) throws Exception {
+  @Test
+  public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception {
+    doTest(true, true);
+  }
+
+  private void doTest(boolean compressTags, boolean offheapKV) throws Exception {
     Configuration conf = new Configuration(false);
     conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
     WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
         compressTags));
     ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
     Encoder encoder = codec.getEncoder(bos);
-    encoder.write(createKV(1));
-    encoder.write(createKV(0));
-    encoder.write(createKV(2));
+    if (offheapKV) {
+      encoder.write(createOffheapKV(1));
+      encoder.write(createOffheapKV(0));
+      encoder.write(createOffheapKV(2));
+    } else {
+      encoder.write(createKV(1));
+      encoder.write(createKV(0));
+      encoder.write(createKV(2));
+    }
 
     InputStream is = new ByteArrayInputStream(bos.toByteArray());
     Decoder decoder = codec.getDecoder(is);
@@ -95,4 +108,19 @@ public class TestWALCellCodecWithCompression {
     }
     return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
   }
+
+  private OffheapKeyValue createOffheapKV(int noOfTags) {
+    byte[] row = Bytes.toBytes("myRow");
+    byte[] cf = Bytes.toBytes("myCF");
+    byte[] q = Bytes.toBytes("myQualifier");
+    byte[] value = Bytes.toBytes("myValue");
+    List<Tag> tags = new ArrayList<Tag>(noOfTags);
+    for (int i = 1; i <= noOfTags; i++) {
+      tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
+    }
+    KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
+    ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+    dbb.put(kv.getBuffer());
+    return new OffheapKeyValue(dbb, 0, kv.getBuffer().length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 3e060ab..0562fd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -90,7 +93,8 @@ public class TestWALReaderOnSecureWAL {
     FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
   }
 
-  private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
+  @SuppressWarnings("deprecation")
+  private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap) throws IOException {
     Configuration conf = TEST_UTIL.getConfiguration();
     String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
     conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
@@ -116,7 +120,15 @@ public class TestWALReaderOnSecureWAL {
           wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
       for (int i = 0; i < total; i++) {
         WALEdit kvs = new WALEdit();
-        kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+        KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
+        if (offheap) {
+          ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+          bb.put(kv.getBuffer());
+          OffheapKeyValue offheapKV = new OffheapKeyValue(bb, 0, kv.getLength());
+          kvs.add(offheapKV);
+        } else {
+          kvs.add(kv);
+        }
         wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
             System.currentTimeMillis(), mvcc, scopes), kvs, true);
       }
@@ -132,7 +144,16 @@ public class TestWALReaderOnSecureWAL {
   }
 
   @Test()
-  public void testWALReaderOnSecureWAL() throws Exception {
+  public void testWALReaderOnSecureWALWithKeyValues() throws Exception {
+    testSecureWALInternal(false);
+  }
+
+  @Test()
+  public void testWALReaderOnSecureWALWithOffheapKeyValues() throws Exception {
+    testSecureWALInternal(true);
+  }
+
+  private void testSecureWALInternal(boolean offheap) throws IOException, FileNotFoundException {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
       WAL.Reader.class);
@@ -143,7 +164,7 @@ public class TestWALReaderOnSecureWAL {
     conf.setBoolean(WAL_ENCRYPTION, true);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
-    Path walPath = writeWAL(wals, currentTest.getMethodName());
+    Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap);
 
     // Insure edits are not plaintext
     long length = fs.getFileStatus(walPath).getLen();
@@ -188,7 +209,7 @@ public class TestWALReaderOnSecureWAL {
     conf.setBoolean(WAL_ENCRYPTION, false);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
-    Path walPath = writeWAL(wals, currentTest.getMethodName());
+    Path walPath = writeWAL(wals, currentTest.getMethodName(), false);
 
     // Ensure edits are plaintext
     long length = fs.getFileStatus(walPath).getLen();