You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/01/29 05:36:48 UTC

hbase git commit: HBASE-14841 Allow Dictionary to work with BytebufferedCells (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master 47506e805 -> 0de221a19


HBASE-14841 Allow Dictionary to work with BytebufferedCells (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0de221a1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0de221a1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0de221a1

Branch: refs/heads/master
Commit: 0de221a19d799ad515f8f4556cacd05e6b4e74f8
Parents: 47506e8
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Jan 29 10:05:26 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Jan 29 10:06:20 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |  19 +++
 .../hadoop/hbase/io/TagCompressionContext.java  |  35 ++++-
 .../io/encoding/BufferedDataBlockEncoder.java   |   6 +-
 .../apache/hadoop/hbase/io/util/Dictionary.java |  13 +-
 .../hadoop/hbase/io/util/LRUDictionary.java     | 141 ++++++++++++++++---
 .../hadoop/hbase/util/ByteBufferUtils.java      |  16 +++
 .../hbase/io/TestTagCompressionContext.java     |  78 +++++++++-
 7 files changed, 272 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 1b38b56..7db1c76 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1629,6 +1630,24 @@ public final class CellUtil {
     return new FirstOnRowDeleteFamilyCell(row, fam);
   }
 
+  /**
+   * Compresses the tags to the given outputstream using the TagcompressionContext
+   * @param out the outputstream to which the compression should happen
+   * @param cell the cell which has tags
+   * @param tagCompressionContext the TagCompressionContext
+   * @throws IOException can throw IOException if the compression encounters issue
+   */
+  public static void compressTags(DataOutputStream out, Cell cell,
+      TagCompressionContext tagCompressionContext) throws IOException {
+    if (cell instanceof ByteBufferedCell) {
+      tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(),
+          ((ByteBufferedCell) cell).getTagsPosition(), cell.getTagsLength());
+    } else {
+      tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+    }
+  }
+
   @InterfaceAudience.Private
   /**
    * These cells are used in reseeks/seeks to improve the read performance.

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 05c4ad1..278dfc4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -79,17 +79,24 @@ public class TagCompressionContext {
    * Compress tags one by one and writes to the OutputStream.
    * @param out Stream to which the compressed tags to be written
    * @param in Source buffer where tags are available
+   * @param offset Offset for the tags byte buffer
    * @param length Length of all tag bytes
    * @throws IOException
    */
-  public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
+  public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
+      throws IOException {
     if (in.hasArray()) {
-      compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
-      ByteBufferUtils.skip(in, length);
+      compressTags(out, in.array(), offset, length);
     } else {
-      byte[] tagBuf = new byte[length];
-      in.get(tagBuf);
-      compressTags(out, tagBuf, 0, length);
+      int pos = offset;
+      int endOffset = pos + length;
+      assert pos < endOffset;
+      while (pos < endOffset) {
+        int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
+        pos += Tag.TAG_LENGTH_SIZE;
+        write(in, pos, tagLen, out);
+        pos += tagLen;
+      }
     }
   }
 
@@ -167,7 +174,7 @@ public class TagCompressionContext {
    * @param src Stream where the compressed tags are available
    * @param dest Destination buffer where to write the uncompressed tags
    * @param length Length of all tag bytes
-   * @throws IOException
+   * @throws IOException when the dictionary does not have the entry
    */
   public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
     if (dest.hasArray()) {
@@ -192,4 +199,18 @@ public class TagCompressionContext {
       StreamUtils.writeShort(out, dictIdx);
     }
   }
+
+  private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException {
+    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+    if (tagDict != null) {
+      dictIdx = tagDict.findEntry(data, offset, length);
+    }
+    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+      out.write(Dictionary.NOT_IN_DICTIONARY);
+      StreamUtils.writeRawVInt32(out, length);
+      ByteBufferUtils.copyBufferToStream(out, data, offset, length);
+    } else {
+      StreamUtils.writeShort(out, dictIdx);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 33e38c7..817b1a7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -1002,10 +1002,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
         // the tags using Dictionary compression in such a case
         if (tagCompressionContext != null) {
-          // TODO : Make Dictionary interface to work with BBs and then change the corresponding
-          // compress tags code to work with BB
-          tagCompressionContext
-              .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+          // Not passing tagsLength considering that parsing of the tagsLength is not costly
+          CellUtil.compressTags(out, cell, tagCompressionContext);
         } else {
           CellUtil.writeTags(out, cell, tagsLength);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
index 4a3d42f..54677da 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.io.util;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
@@ -51,6 +53,16 @@ public interface Dictionary {
   short findEntry(byte[] data, int offset, int length);
 
   /**
+   * Finds the index of an entry.
+   * If no entry found, we add it.
+   * @param data the ByteBuffer that we're looking up
+   * @param offset Offset into <code>data</code> to add to Dictionary.
+   * @param length Length beyond <code>offset</code> that comprises entry; must be &gt; 0.
+   * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+   */
+  short findEntry(ByteBuffer data, int offset, int length);
+
+  /**
    * Adds an entry to the dictionary.
    * Be careful using this method.  It will add an entry to the
    * dictionary even if it already has an entry for the same data.
@@ -62,7 +74,6 @@ public interface Dictionary {
    * @param length Length beyond <code>offset</code> that comprises entry; must be &gt; 0.
    * @return the index of the entry
    */
-
   short addEntry(byte[] data, int offset, int length);
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
index 8562cf0..99780ba 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.hbase.io.util;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Preconditions;
@@ -58,8 +60,12 @@ public class LRUDictionary implements Dictionary {
 
   @Override
   public short addEntry(byte[] data, int offset, int length) {
+    return addEntryInternal(data, offset, length, true);
+  }
+
+  private short addEntryInternal(byte[] data, int offset, int length, boolean copy) {
     if (length <= 0) return NOT_IN_DICTIONARY;
-    return backingStore.put(data, offset, length);
+    return backingStore.put(data, offset, length, copy);
   }
 
   @Override
@@ -89,16 +95,23 @@ public class LRUDictionary implements Dictionary {
       indexToNode = new Node[initialSize];
     }
 
-    private short put(byte[] array, int offset, int length) {
-      // We copy the bytes we want, otherwise we might be holding references to
-      // massive arrays in our dictionary (or those arrays might change)
-      byte[] stored = new byte[length];
-      Bytes.putBytes(stored, 0, array, offset, length);
+    private short put(byte[] array, int offset, int length, boolean copy) {
+      if (copy) {
+        // We copy the bytes we want, otherwise we might be holding references to
+        // massive arrays in our dictionary (or those arrays might change)
+        byte[] stored = new byte[length];
+        Bytes.putBytes(stored, 0, array, offset, length);
+        return putInternal(stored);
+      } else {
+        return putInternal(array);
+      }
+    }
 
+    private short putInternal(byte[] stored) {
       if (currSize < initSize) {
         // There is space to add without evicting.
         if (indexToNode[currSize] == null) {
-          indexToNode[currSize] = new Node();
+          indexToNode[currSize] = new ByteArrayBackedNode();
         }
         indexToNode[currSize].setContents(stored, 0, stored.length);
         setHead(indexToNode[currSize]);
@@ -117,7 +130,7 @@ public class LRUDictionary implements Dictionary {
 
     private short findIdx(byte[] array, int offset, int length) {
       Short s;
-      final Node comparisonNode = new Node();
+      final Node comparisonNode = new ByteArrayBackedNode();
       comparisonNode.setContents(array, offset, length);
       if ((s = nodeToIndex.get(comparisonNode)) != null) {
         moveToHead(indexToNode[s]);
@@ -127,10 +140,22 @@ public class LRUDictionary implements Dictionary {
       }
     }
 
+    private short findIdx(ByteBuffer buf, int offset, int length) {
+      Short s;
+      final ByteBufferBackedNode comparisonNode = new ByteBufferBackedNode();
+      comparisonNode.setContents(buf, offset, length);
+      if ((s = nodeToIndex.get(comparisonNode)) != null) {
+        moveToHead(indexToNode[s]);
+        return s;
+      } else {
+        return -1;
+      }
+    }
+
     private byte[] get(short idx) {
       Preconditions.checkElementIndex(idx, currSize);
       moveToHead(indexToNode[idx]);
-      return indexToNode[idx].container;
+      return indexToNode[idx].getContents();
     }
 
     private void moveToHead(Node n) {
@@ -153,7 +178,7 @@ public class LRUDictionary implements Dictionary {
       // Node is now removed from the list. Re-add it at the head.
       setHead(n);
     }
-    
+
     private void setHead(Node n) {
       // assume it's already unlinked from the list at this point.
       n.prev = null;
@@ -175,7 +200,7 @@ public class LRUDictionary implements Dictionary {
       for (int i = 0; i < currSize; i++) {
         indexToNode[i].next = null;
         indexToNode[i].prev = null;
-        indexToNode[i].container = null;
+        indexToNode[i].resetContents();
       }
       currSize = 0;
       nodeToIndex.clear();
@@ -183,37 +208,117 @@ public class LRUDictionary implements Dictionary {
       head = null;
     }
 
-    private static class Node {
-      byte[] container;
+    private static abstract class Node {
       int offset;
       int length;
       Node next; // link towards the tail
       Node prev; // link towards the head
+      abstract void setContents(byte[] container, int offset, int length);
+      abstract byte[] getContents();
+      abstract void resetContents();
+    }
+    // The actual contents of the LRUDictionary are of ByteArrayBackedNode type 
+    private static class ByteArrayBackedNode extends Node {
+      private byte[] container;
 
-      public Node() {
-      }
-
-      private void setContents(byte[] container, int offset, int length) {
+      @Override
+      void setContents(byte[] container, int offset, int length) {
         this.container = container;
         this.offset = offset;
         this.length = length;
       }
 
       @Override
+      byte[] getContents() {
+        return this.container;
+      }
+
+      @Override
       public int hashCode() {
         return Bytes.hashCode(container, offset, length);
       }
 
       @Override
+      void resetContents() {
+        this.container = null;
+      }
+
+      @Override
       public boolean equals(Object other) {
         if (!(other instanceof Node)) {
           return false;
         }
 
         Node casted = (Node) other;
-        return Bytes.equals(container, offset, length, casted.container,
+        return Bytes.equals(container, offset, length, casted.getContents(),
             casted.offset, casted.length);
       }
     }
+
+    // Currently only used for finding the index and hence this node acts
+    // as a temporary holder to look up in the indexToNode map
+    // which is formed by ByteArrayBackedNode
+    private static class ByteBufferBackedNode extends Node {
+      private ByteBuffer container;
+
+      public ByteBufferBackedNode() {
+      }
+
+      @Override
+      void setContents(byte[] container, int offset, int length) {
+        this.container = ByteBuffer.wrap(container);
+        this.offset = offset;
+        this.length = length;
+      }
+
+      void setContents(ByteBuffer container, int offset, int length) {
+        this.container = container;
+        this.offset = offset;
+        this.length = length;
+      }
+
+      @Override
+      void resetContents() {
+        this.container = null;
+      }
+
+      @Override
+      byte[] getContents() {
+        // This ideally should not be called
+        byte[] copy = new byte[this.length];
+        ByteBufferUtils.copyFromBufferToArray(copy, (ByteBuffer) this.container, this.offset, 0,
+            length);
+        return copy;
+      }
+
+      @Override
+      public int hashCode() {
+        return ByteBufferUtils.hashCode(container, offset, length);
+      }
+
+      @Override
+      public boolean equals(Object other) {
+        if (!(other instanceof Node)) {
+          return false;
+        }
+        // This was done to avoid findbugs comment
+        Node casted = (Node) other;
+        // The other side should be a byte array backed node only as we add only
+        // ByteArrayBackedNode to the indexToNode map.
+        return ByteBufferUtils.equals(this.container, offset, length,
+            casted.getContents(), casted.offset, casted.length);
+      }
+    }
+  }
+
+  @Override
+  public short findEntry(ByteBuffer data, int offset, int length) {
+    short ret = backingStore.findIdx(data, offset, length);
+    if (ret == NOT_IN_DICTIONARY) {
+      byte[] copy = new byte[length];
+      ByteBufferUtils.copyFromBufferToArray(copy, data, offset, 0, length);
+      addEntryInternal(copy, 0, length, false);
+    }
+    return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 99e798a..7f3d777 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -579,6 +579,22 @@ public final class ByteBufferUtils {
     return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
   }
 
+  /**
+   * @param buf
+   *          ByteBuffer to hash
+   * @param offset
+   *          offset to start from
+   * @param length
+   *          length to hash
+   */
+  public static int hashCode(ByteBuffer buf, int offset, int length) {
+    int hash = 1;
+    for (int i = offset; i < offset + length; i++) {
+      hash = (31 * hash) + (int) toByte(buf, i);
+    }
+    return hash;
+  }
+
   public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
     if (UNSAFE_UNALIGNED) {
       long offset1Adj, offset2Adj;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0de221a1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index 6c46cf2..a332a63 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -21,18 +21,22 @@ package org.apache.hadoop.hbase.io;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.ByteBufferedCell;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,12 +55,12 @@ public class TestTagCompressionContext {
     TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
     KeyValue kv1 = createKVWithTags(2);
     int tagsLength1 = kv1.getTagsLength();
-    ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
-    context.compressTags(baos, ib, tagsLength1);
+    ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray());
+    context.compressTags(baos, ib, kv1.getTagsOffset(), tagsLength1);
     KeyValue kv2 = createKVWithTags(3);
     int tagsLength2 = kv2.getTagsLength();
-    ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
-    context.compressTags(baos, ib, tagsLength2);
+    ib = ByteBuffer.wrap(kv2.getTagsArray());
+    context.compressTags(baos, ib, kv2.getTagsOffset(), tagsLength2);
 
     context.clear();
 
@@ -72,6 +76,31 @@ public class TestTagCompressionContext {
   }
 
   @Test
+  public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
+    ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
+    int tagsLength1 = kv1.getTagsLength();
+    context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
+    ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
+    int tagsLength2 = kv2.getTagsLength();
+    context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
+
+    context.clear();
+
+    byte[] dest = new byte[tagsLength1];
+    ByteBuffer ob = ByteBuffer.wrap(baos.getBuffer());
+    context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
+    assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+        tagsLength1));
+    dest = new byte[tagsLength2];
+    context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
+    assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+        tagsLength2));
+  }
+
+  @Test
   public void testCompressUncompressTags2() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
@@ -84,7 +113,32 @@ public class TestTagCompressionContext {
 
     context.clear();
 
-    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
+    byte[] dest = new byte[tagsLength1];
+    context.uncompressTags(bais, dest, 0, tagsLength1);
+    assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+        tagsLength1));
+    dest = new byte[tagsLength2];
+    context.uncompressTags(bais, dest, 0, tagsLength2);
+    assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+        tagsLength2));
+  }
+
+  @Test
+  public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
+    ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
+    int tagsLength1 = kv1.getTagsLength();
+    context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
+    ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
+    int tagsLength2 = kv2.getTagsLength();
+    context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
+
+    context.clear();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
     byte[] dest = new byte[tagsLength1];
     context.uncompressTags(bais, dest, 0, tagsLength1);
     assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
@@ -103,4 +157,16 @@ public class TestTagCompressionContext {
     KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
     return kv;
   }
+
+  private Cell createOffheapKVWithTags(int noOfTags) {
+    List<Tag> tags = new ArrayList<Tag>();
+    for (int i = 0; i < noOfTags; i++) {
+      tags.add(new ArrayBackedTag((byte) i, "tagValue" + i));
+    }
+    KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
+    ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+    ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length);
+    OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0);
+    return offheapKV;
+  }
 }