You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2013/10/15 05:56:36 UTC

svn commit: r1532176 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/io/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/ hbase-common/src/test/java/org/apache/hadoop/hbase/io/ hbase-common/src/test/java/org/apache/hado...

Author: anoopsamjohn
Date: Tue Oct 15 03:56:36 2013
New Revision: 1532176

URL: http://svn.apache.org/r1532176
Log:
HBASE-9137 Add Tag dictionary in WAL compression

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
+ * will be used for compressing tags while writing into WALs.
+ */
+@InterfaceAudience.Private
+public class TagCompressionContext {
+  private final Dictionary tagDict;
+
+  public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
+      NoSuchMethodException, InstantiationException, IllegalAccessException,
+      InvocationTargetException {
+    Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
+    tagDict = dictConstructor.newInstance();
+    tagDict.init(Short.MAX_VALUE);
+  }
+
+  public void clear() {
+    tagDict.clear();
+  }
+
+  /**
+   * Compress tags one by one and writes the OutputStream. 
+   * @param out Stream to which the compressed tags to be written
+   * @param in Source where tags are available
+   * @param offset Offset for the tags bytes
+   * @param length Length of all tag bytes
+   * @throws IOException
+   */
+  public void compressTags(OutputStream out, byte[] in, int offset, short length)
+      throws IOException {
+    int pos = offset;
+    int endOffset = pos + length;
+    assert pos < endOffset;
+    while (pos < endOffset) {
+      short tagLen = Bytes.toShort(in, pos);
+      pos += Tag.TAG_LENGTH_SIZE;
+      write(in, pos, tagLen, out);
+      pos += tagLen;
+    }
+  }
+
+  /**
+   * Uncompress tags from the InputStream and writes to the destination array.
+   * @param src Stream where the compressed tags are available
+   * @param dest Destination array where to write the uncompressed tags
+   * @param offset Offset in destination where tags to be written
+   * @param length Length of all tag bytes
+   * @throws IOException
+   */
+  public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
+      throws IOException {
+    int endOffset = offset + length;
+    while (offset < endOffset) {
+      byte status = (byte) src.read();
+      if (status == Dictionary.NOT_IN_DICTIONARY) {
+        // We are writing short as tagLen. So can downcast this without any risk.
+        short tagLen = (short) StreamUtils.readRawVarint32(src);
+        offset = Bytes.putShort(dest, offset, tagLen);
+        IOUtils.readFully(src, dest, offset, tagLen);
+        tagDict.addEntry(dest, offset, tagLen);
+        offset += tagLen;
+      } else {
+        short dictIdx = StreamUtils.toShort(status, (byte) src.read());
+        byte[] entry = tagDict.getEntry(dictIdx);
+        if (entry == null) {
+          throw new IOException("Missing dictionary entry for index " + dictIdx);
+        }
+        offset = Bytes.putShort(dest, offset, (short) entry.length);
+        System.arraycopy(entry, 0, dest, offset, entry.length);
+        offset += entry.length;
+      }
+    }
+  }
+
+  private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
+    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+    if (tagDict != null) {
+      dictIdx = tagDict.findEntry(data, offset, length);
+    }
+    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+      out.write(Dictionary.NOT_IN_DICTIONARY);
+      StreamUtils.writeRawVInt32(out, length);
+      out.write(data, offset, length);
+    } else {
+      StreamUtils.writeShort(out, dictIdx);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+@InterfaceAudience.Private
+public interface Dictionary {
+  byte NOT_IN_DICTIONARY = -1;
+
+  void init(int initialSize);
+  /**
+   * Gets an entry from the dictionary.
+   * 
+   * @param idx index of the entry
+   * @return the entry, or null if non existent
+   */
+  byte[] getEntry(short idx);
+
+  /**
+   * Finds the index of an entry.
+   * If no entry found, we add it.
+   * 
+   * @param data the byte array that we're looking up
+   * @param offset Offset into <code>data</code> to add to Dictionary.
+   * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+   * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+   */
+  short findEntry(byte[] data, int offset, int length);
+
+  /**
+   * Adds an entry to the dictionary.
+   * Be careful using this method.  It will add an entry to the
+   * dictionary even if it already has an entry for the same data.
+   * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+   * dictionary entries.
+   * 
+   * @param data the entry to add
+   * @param offset Offset into <code>data</code> to add to Dictionary.
+   * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+   * @return the index of the entry
+   */
+
+  short addEntry(byte[] data, int offset, int length);
+
+  /**
+   * Flushes the dictionary, empties all values.
+   */
+  void clear();
+}

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable.  Currently has max of 2^15 entries.  Will start
+ * evicting if exceeds this number  The maximum memory we expect this dictionary
+ * to take in the worst case is about:
+ * <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes 
+ * (these are some big names) = ~16MB</code>.
+ * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
+ */
+@InterfaceAudience.Private
+public class LRUDictionary implements Dictionary {
+
+  BidirectionalLRUMap backingStore;
+  @Override
+  public byte[] getEntry(short idx) {
+    return backingStore.get(idx);
+  }
+
+  @Override
+  public void init(int initialSize) {
+    backingStore = new BidirectionalLRUMap(initialSize);
+  }
+  @Override
+  public short findEntry(byte[] data, int offset, int length) {
+    short ret = backingStore.findIdx(data, offset, length);
+    if (ret == NOT_IN_DICTIONARY) {
+      addEntry(data, offset, length);
+    }
+    return ret;
+  }
+
+  @Override
+  public short addEntry(byte[] data, int offset, int length) {
+    if (length <= 0) return NOT_IN_DICTIONARY;
+    return backingStore.put(data, offset, length);
+  }
+
+  @Override
+  public void clear() {
+    backingStore.clear();
+  }
+
+  /*
+   * Internal class used to implement LRU eviction and dual lookup (by key and
+   * value).
+   * 
+   * This is not thread safe. Don't use in multi-threaded applications.
+   */
+  static class BidirectionalLRUMap {
+    private int currSize = 0;
+
+    // Head and tail of the LRU list.
+    private Node head;
+    private Node tail;
+
+    private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>();
+    private Node[] indexToNode;
+    private int initSize = 0;
+
+    public BidirectionalLRUMap(int initialSize) {
+      initSize = initialSize;
+      indexToNode = new Node[initialSize];
+      for (int i = 0; i < initialSize; i++) {
+        indexToNode[i] = new Node();
+      }
+    }
+
+    private short put(byte[] array, int offset, int length) {
+      // We copy the bytes we want, otherwise we might be holding references to
+      // massive arrays in our dictionary (or those arrays might change)
+      byte[] stored = new byte[length];
+      Bytes.putBytes(stored, 0, array, offset, length);
+
+      if (currSize < initSize) {
+        // There is space to add without evicting.
+        indexToNode[currSize].setContents(stored, 0, stored.length);
+        setHead(indexToNode[currSize]);
+        short ret = (short) currSize++;
+        nodeToIndex.put(indexToNode[ret], ret);
+        return ret;
+      } else {
+        short s = nodeToIndex.remove(tail);
+        tail.setContents(stored, 0, stored.length);
+        // we need to rehash this.
+        nodeToIndex.put(tail, s);
+        moveToHead(tail);
+        return s;
+      }
+    }
+
+    private short findIdx(byte[] array, int offset, int length) {
+      Short s;
+      final Node comparisonNode = new Node();
+      comparisonNode.setContents(array, offset, length);
+      if ((s = nodeToIndex.get(comparisonNode)) != null) {
+        moveToHead(indexToNode[s]);
+        return s;
+      } else {
+        return -1;
+      }
+    }
+
+    private byte[] get(short idx) {
+      Preconditions.checkElementIndex(idx, currSize);
+      moveToHead(indexToNode[idx]);
+      return indexToNode[idx].container;
+    }
+
+    private void moveToHead(Node n) {
+      if (head == n) {
+        // no-op -- it's already the head.
+        return;
+      }
+      // At this point we definitely have prev, since it's not the head.
+      assert n.prev != null;
+      // Unlink prev.
+      n.prev.next = n.next;
+
+      // Unlink next
+      if (n.next != null) {
+        n.next.prev = n.prev;
+      } else {
+        assert n == tail;
+        tail = n.prev;
+      }
+      // Node is now removed from the list. Re-add it at the head.
+      setHead(n);
+    }
+    
+    private void setHead(Node n) {
+      // assume it's already unlinked from the list at this point.
+      n.prev = null;
+      n.next = head;
+      if (head != null) {
+        assert head.prev == null;
+        head.prev = n;
+      }
+
+      head = n;
+
+      // First entry
+      if (tail == null) {
+        tail = n;
+      }
+    }
+
+    private void clear() {
+      currSize = 0;
+      nodeToIndex.clear();
+      tail = null;
+      head = null;
+
+      for (Node n : indexToNode) {
+        n.container = null;
+      }
+
+      for (int i = 0; i < initSize; i++) {
+        indexToNode[i].next = null;
+        indexToNode[i].prev = null;
+      }
+    }
+
+    private static class Node {
+      byte[] container;
+      int offset;
+      int length;
+      Node next; // link towards the tail
+      Node prev; // link towards the head
+
+      public Node() {
+      }
+
+      private void setContents(byte[] container, int offset, int length) {
+        this.container = container;
+        this.offset = offset;
+        this.length = length;
+      }
+
+      @Override
+      public int hashCode() {
+        return Bytes.hashCode(container, offset, length);
+      }
+
+      @Override
+      public boolean equals(Object other) {
+        if (!(other instanceof Node)) {
+          return false;
+        }
+
+        Node casted = (Node) other;
+        return Bytes.equals(container, offset, length, casted.container,
+            casted.offset, casted.length);
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/*
+ * It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind
+ * blanks out for a split-second and he starts the work by wrapping it in the most convoluted
+ * interface he can come up with. Custom streams that allocate memory, DataOutput that is only used
+ * to write single bytes... We operate on simple streams. Thus, we are going to have a simple
+ * implementation copy-pasted from protobuf Coded*Stream.
+ */
+@InterfaceAudience.Private
+public class StreamUtils {
+
+  public static void writeRawVInt32(OutputStream output, int value) throws IOException {
+    assert value >= 0;
+    while (true) {
+      if ((value & ~0x7F) == 0) {
+        output.write(value);
+        return;
+      } else {
+        output.write((value & 0x7F) | 0x80);
+        value >>>= 7;
+      }
+    }
+  }
+
+  public static int readRawVarint32(InputStream input) throws IOException {
+    byte tmp = (byte) input.read();
+    if (tmp >= 0) {
+      return tmp;
+    }
+    int result = tmp & 0x7f;
+    if ((tmp = (byte) input.read()) >= 0) {
+      result |= tmp << 7;
+    } else {
+      result |= (tmp & 0x7f) << 7;
+      if ((tmp = (byte) input.read()) >= 0) {
+        result |= tmp << 14;
+      } else {
+        result |= (tmp & 0x7f) << 14;
+        if ((tmp = (byte) input.read()) >= 0) {
+          result |= tmp << 21;
+        } else {
+          result |= (tmp & 0x7f) << 21;
+          result |= (tmp = (byte) input.read()) << 28;
+          if (tmp < 0) {
+            // Discard upper 32 bits.
+            for (int i = 0; i < 5; i++) {
+              if (input.read() >= 0) {
+                return result;
+              }
+            }
+            throw new IOException("Malformed varint");
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  public static int readRawVarint32(ByteBuffer input) throws IOException {
+    byte tmp = input.get();
+    if (tmp >= 0) {
+      return tmp;
+    }
+    int result = tmp & 0x7f;
+    if ((tmp = input.get()) >= 0) {
+      result |= tmp << 7;
+    } else {
+      result |= (tmp & 0x7f) << 7;
+      if ((tmp = input.get()) >= 0) {
+        result |= tmp << 14;
+      } else {
+        result |= (tmp & 0x7f) << 14;
+        if ((tmp = input.get()) >= 0) {
+          result |= tmp << 21;
+        } else {
+          result |= (tmp & 0x7f) << 21;
+          result |= (tmp = input.get()) << 28;
+          if (tmp < 0) {
+            // Discard upper 32 bits.
+            for (int i = 0; i < 5; i++) {
+              if (input.get() >= 0) {
+                return result;
+              }
+            }
+            throw new IOException("Malformed varint");
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  public static short toShort(byte hi, byte lo) {
+    short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+    Preconditions.checkArgument(s >= 0);
+    return s;
+  }
+
+  public static void writeShort(OutputStream out, short v) throws IOException {
+    Preconditions.checkArgument(v >= 0);
+    out.write((byte) (0xff & (v >> 8)));
+    out.write((byte) (0xff & v));
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+  LRUDictionary testee;
+
+  @Before
+  public void setUp() throws Exception {
+    testee = new LRUDictionary();
+    testee.init(Short.MAX_VALUE);
+  }
+
+  @Test
+  public void TestContainsNothing() {
+    assertTrue(isDictionaryEmpty(testee));
+  }
+
+  /**
+   * Assert can't add empty array.
+   */
+  @Test
+  public void testPassingEmptyArrayToFindEntry() {
+    assertEquals(Dictionary.NOT_IN_DICTIONARY,
+      testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+    assertEquals(Dictionary.NOT_IN_DICTIONARY,
+      testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+  }
+
+  @Test
+  public void testPassingSameArrayToAddEntry() {
+    // Add random predefined byte array, in this case a random byte array from
+    // HConstants.  Assert that when we add, we get new index.  Thats how it
+    // works.
+    int len = HConstants.CATALOG_FAMILY.length;
+    int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+    assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+    assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+  }
+
+  @Test
+  public void testBasic() {
+    Random rand = new Random();
+    byte[] testBytes = new byte[10];
+    rand.nextBytes(testBytes);
+
+    // Verify that our randomly generated array doesn't exist in the dictionary
+    assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+    // now since we looked up an entry, we should have added it to the
+    // dictionary, so it isn't empty
+
+    assertFalse(isDictionaryEmpty(testee));
+
+    // Check if we can find it using findEntry
+    short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+    // Making sure we do find what we're looking for
+    assertTrue(t != -1);
+
+    byte[] testBytesCopy = new byte[20];
+
+    Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+    // copy byte arrays, make sure that we check that equal byte arrays are
+    // equal without just checking the reference
+    assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+    // make sure the entry retrieved is the same as the one put in
+    assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+    testee.clear();
+
+    // making sure clear clears the dictionary
+    assertTrue(isDictionaryEmpty(testee));
+  }
+
+  @Test
+  public void TestLRUPolicy(){
+    //start by filling the dictionary up with byte arrays
+    for (int i = 0; i < Short.MAX_VALUE; i++) {
+      testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+          (BigInteger.valueOf(i)).toByteArray().length);
+    }
+
+    // check we have the first element added
+    assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+        BigInteger.ZERO.toByteArray().length) != -1);
+
+    // check for an element we know isn't there
+    assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+        BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+    // since we just checked for this element, it should be there now.
+    assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+        BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+    // test eviction, that the least recently added or looked at element is
+    // evicted.  We looked at ZERO so it should be in the dictionary still.
+    assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+      BigInteger.ZERO.toByteArray().length) != -1);
+    // Now go from beyond 1 to the end.
+    for(int i = 1; i < Short.MAX_VALUE; i++) {
+      assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+          BigInteger.valueOf(i).toByteArray().length) == -1);
+    }
+
+    // check we can find all of these.
+    for (int i = 0; i < Short.MAX_VALUE; i++) {
+      assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+          BigInteger.valueOf(i).toByteArray().length) != -1);
+    }
+  }
+
+  static private boolean isDictionaryEmpty(LRUDictionary dict) {
+    try {
+      dict.getEntry((short)0);
+      return false;
+    } catch (IndexOutOfBoundsException ioobe) {
+      return true;
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java Tue Oct 15 03:56:36 2013
@@ -20,21 +20,31 @@ package org.apache.hadoop.hbase.regionse
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
 
 /**
  * Context that holds the various dictionaries for compression in HLog.
  */
 @InterfaceAudience.Private
 class CompressionContext {
+
+  static final String ENABLE_WAL_TAGS_COMPRESSION = 
+      "hbase.regionserver.wal.tags.enablecompression";
+
   final Dictionary regionDict;
   final Dictionary tableDict;
   final Dictionary familyDict;
   final Dictionary qualifierDict;
   final Dictionary rowDict;
+  // Context used for compressing tags
+  TagCompressionContext tagCompressionContext = null;
 
-  public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits)
-  throws SecurityException, NoSuchMethodException, InstantiationException,
+  public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
+      Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
       IllegalAccessException, InvocationTargetException {
     Constructor<? extends Dictionary> dictConstructor =
         dictType.getConstructor();
@@ -54,6 +64,9 @@ class CompressionContext {
     rowDict.init(Short.MAX_VALUE);
     familyDict.init(Byte.MAX_VALUE);
     qualifierDict.init(Byte.MAX_VALUE);
+    if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
+      tagCompressionContext = new TagCompressionContext(dictType);
+    }
   }
 
   void clear() {
@@ -62,5 +75,8 @@ class CompressionContext {
     familyDict.clear();
     qualifierDict.clear();
     rowDict.clear();
+    if (tagCompressionContext != null) {
+      tagCompressionContext.clear();
+    }
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Tue Oct 15 03:56:36 2013
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.Dictionary;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java Tue Oct 15 03:56:36 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.FSUtils;
 
@@ -73,7 +74,7 @@ public abstract class ReaderBase impleme
       try {
         if (compressionContext == null) {
           compressionContext = new CompressionContext(LRUDictionary.class,
-              FSUtils.isRecoveredEdits(path));
+              FSUtils.isRecoveredEdits(path), conf);
         } else {
           compressionContext.clear();
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java Tue Oct 15 03:56:36 2013
@@ -29,11 +29,12 @@ import org.apache.hadoop.hbase.codec.Bas
 import org.apache.hadoop.hbase.codec.BaseEncoder;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.io.IOUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 
 
@@ -157,7 +158,8 @@ public class WALCellCodec implements Cod
       StreamUtils.writeRawVInt32(out, kv.getKeyLength());
       StreamUtils.writeRawVInt32(out, kv.getValueLength());
       // To support tags
-      StreamUtils.writeRawVInt32(out, kv.getTagsLength());
+      short tagsLength = kv.getTagsLength();
+      StreamUtils.writeRawVInt32(out, tagsLength);
 
       // Write row, qualifier, and family; use dictionary
       // compression as they're likely to have duplicates.
@@ -165,11 +167,25 @@ public class WALCellCodec implements Cod
       write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
       write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
 
-      // Write the rest uncompressed.
+      // Write timestamp, type and value as uncompressed.
       int pos = kv.getTimestampOffset();
-      int remainingLength = kv.getLength() + offset - pos;
-      out.write(kvBuffer, pos, remainingLength);
-
+      int tsTypeValLen = kv.getLength() + offset - pos;
+      if (tagsLength > 0) {
+        tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+      }
+      assert tsTypeValLen > 0;
+      out.write(kvBuffer, pos, tsTypeValLen);
+      if (tagsLength > 0) {
+        if (compression.tagCompressionContext != null) {
+          // Write tags using Dictionary compression
+          compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
+              tagsLength);
+        } else {
+          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
+          // it is.
+          out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
+        }
+      }
     }
 
     private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
@@ -199,7 +215,7 @@ public class WALCellCodec implements Cod
       int keylength = StreamUtils.readRawVarint32(in);
       int vlength = StreamUtils.readRawVarint32(in);
       
-      int tagsLength = StreamUtils.readRawVarint32(in);
+      short tagsLength = (short) StreamUtils.readRawVarint32(in);
       int length = 0;
       if(tagsLength == 0) {
         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@@ -228,8 +244,23 @@ public class WALCellCodec implements Cod
       elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
       pos += elemLen;
 
-      // the rest
-      IOUtils.readFully(in, backingArray, pos, length - pos);
+      // timestamp, type and value
+      int tsTypeValLen = length - pos;
+      if (tagsLength > 0) {
+        tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+      }
+      IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
+      pos += tsTypeValLen;
+
+      // tags
+      if (tagsLength > 0) {
+        pos = Bytes.putShort(backingArray, pos, tagsLength);
+        if (compression.tagCompressionContext != null) {
+          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
+        } else {
+          IOUtils.readFully(in, backingArray, pos, tagsLength);
+        }
+      }
       return new KeyValue(backingArray, 0, length);
     }
 
@@ -294,80 +325,4 @@ public class WALCellCodec implements Cod
     // TODO: ideally this should also encapsulate compressionContext
     return this.statelessUncompressor;
   }
-
-  /**
-   * It seems like as soon as somebody sets himself to the task of creating VInt encoding,
-   * his mind blanks out for a split-second and he starts the work by wrapping it in the
-   * most convoluted interface he can come up with. Custom streams that allocate memory,
-   * DataOutput that is only used to write single bytes... We operate on simple streams.
-   * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
-   */
-  private static class StreamUtils {
-    public static int computeRawVarint32Size(final int value) {
-      if ((value & (0xffffffff <<  7)) == 0) return 1;
-      if ((value & (0xffffffff << 14)) == 0) return 2;
-      if ((value & (0xffffffff << 21)) == 0) return 3;
-      if ((value & (0xffffffff << 28)) == 0) return 4;
-      return 5;
-    }
-
-    static void writeRawVInt32(OutputStream output, int value) throws IOException {
-      assert value >= 0;
-      while (true) {
-        if ((value & ~0x7F) == 0) {
-          output.write(value);
-          return;
-        } else {
-          output.write((value & 0x7F) | 0x80);
-          value >>>= 7;
-        }
-      }
-    }
-
-    static int readRawVarint32(InputStream input) throws IOException {
-      byte tmp = (byte)input.read();
-      if (tmp >= 0) {
-        return tmp;
-      }
-      int result = tmp & 0x7f;
-      if ((tmp = (byte)input.read()) >= 0) {
-        result |= tmp << 7;
-      } else {
-        result |= (tmp & 0x7f) << 7;
-        if ((tmp = (byte)input.read()) >= 0) {
-          result |= tmp << 14;
-        } else {
-          result |= (tmp & 0x7f) << 14;
-          if ((tmp = (byte)input.read()) >= 0) {
-            result |= tmp << 21;
-          } else {
-            result |= (tmp & 0x7f) << 21;
-            result |= (tmp = (byte)input.read()) << 28;
-            if (tmp < 0) {
-              // Discard upper 32 bits.
-              for (int i = 0; i < 5; i++) {
-                if (input.read() >= 0) {
-                  return result;
-                }
-              }
-              throw new IOException("Malformed varint");
-            }
-          }
-        }
-      }
-      return result;
-    }
-
-    static short toShort(byte hi, byte lo) {
-      short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
-      Preconditions.checkArgument(s >= 0);
-      return s;
-    }
-
-    static void writeShort(OutputStream out, short v) throws IOException {
-      Preconditions.checkArgument(v >= 0);
-      out.write((byte)(0xff & (v >> 8)));
-      out.write((byte)(0xff & v));
-    }
-  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java Tue Oct 15 03:56:36 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
@@ -40,7 +41,7 @@ public abstract class WriterBase impleme
     if (doCompress) {
       try {
         this.compressionContext = new CompressionContext(LRUDictionary.class,
-            FSUtils.isRecoveredEdits(path));
+            FSUtils.isRecoveredEdits(path), conf);
       } catch (Exception e) {
         throw new IOException("Failed to initiate CompressionContext", e);
       }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java Tue Oct 15 03:56:36 2013
@@ -28,6 +28,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.BeforeClass;
 import org.junit.Test;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java Tue Oct 15 03:56:36 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.junit.Test;
@@ -67,7 +68,7 @@ public class TestKeyValueCompression {
   }
 
   private void runTestCycle(List<KeyValue> kvs) throws Exception {
-    CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+    CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
     DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
     for (KeyValue kv : kvs) {
       KeyValueCompression.writeKV(buf, kv, ctx);
@@ -84,7 +85,7 @@ public class TestKeyValueCompression {
 
   @Test
   public void testKVWithTags() throws Exception {
-    CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+    CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
     DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
     KeyValueCompression.writeKV(buf, createKV(1), ctx);
     KeyValueCompression.writeKV(buf, createKV(0), ctx);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java Tue Oct 15 03:56:36 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.codec.Codec.Decoder;
 import org.apache.hadoop.hbase.codec.Codec.Encoder;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -42,8 +43,19 @@ public class TestWALCellCodecWithCompres
 
   @Test
   public void testEncodeDecodeKVsWithTags() throws Exception {
-    WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext(
-        LRUDictionary.class, false));
+    doTest(false);
+  }
+
+  @Test
+  public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
+    doTest(true);
+  }
+
+  private void doTest(boolean compressTags) throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
+    WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
+        conf));
     ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
     Encoder encoder = codec.getEncoder(bos);
     encoder.write(createKV(1));