You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2013/10/15 05:56:36 UTC
svn commit: r1532176 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/io/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/
hbase-common/src/test/java/org/apache/hadoop/hbase/io/
hbase-common/src/test/java/org/apache/hado...
Author: anoopsamjohn
Date: Tue Oct 15 03:56:36 2013
New Revision: 1532176
URL: http://svn.apache.org/r1532176
Log:
HBASE-9137 Add Tag dictionary in WAL compression
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
+ * will be used for compressing tags while writing into WALs.
+ */
+@InterfaceAudience.Private
+public class TagCompressionContext {
+ private final Dictionary tagDict;
+
+ public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
+ NoSuchMethodException, InstantiationException, IllegalAccessException,
+ InvocationTargetException {
+ Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
+ tagDict = dictConstructor.newInstance();
+ tagDict.init(Short.MAX_VALUE);
+ }
+
+ public void clear() {
+ tagDict.clear();
+ }
+
+ /**
+ * Compress tags one by one and writes the OutputStream.
+ * @param out Stream to which the compressed tags to be written
+ * @param in Source where tags are available
+ * @param offset Offset for the tags bytes
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, byte[] in, int offset, short length)
+ throws IOException {
+ int pos = offset;
+ int endOffset = pos + length;
+ assert pos < endOffset;
+ while (pos < endOffset) {
+ short tagLen = Bytes.toShort(in, pos);
+ pos += Tag.TAG_LENGTH_SIZE;
+ write(in, pos, tagLen, out);
+ pos += tagLen;
+ }
+ }
+
+ /**
+ * Uncompress tags from the InputStream and writes to the destination array.
+ * @param src Stream where the compressed tags are available
+ * @param dest Destination array where to write the uncompressed tags
+ * @param offset Offset in destination where tags to be written
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
+ throws IOException {
+ int endOffset = offset + length;
+ while (offset < endOffset) {
+ byte status = (byte) src.read();
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // We are writing short as tagLen. So can downcast this without any risk.
+ short tagLen = (short) StreamUtils.readRawVarint32(src);
+ offset = Bytes.putShort(dest, offset, tagLen);
+ IOUtils.readFully(src, dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen);
+ offset += tagLen;
+ } else {
+ short dictIdx = StreamUtils.toShort(status, (byte) src.read());
+ byte[] entry = tagDict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ offset = Bytes.putShort(dest, offset, (short) entry.length);
+ System.arraycopy(entry, 0, dest, offset, entry.length);
+ offset += entry.length;
+ }
+ }
+ }
+
+ private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (tagDict != null) {
+ dictIdx = tagDict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ out.write(data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+@InterfaceAudience.Private
+public interface Dictionary {
+ byte NOT_IN_DICTIONARY = -1;
+
+ void init(int initialSize);
+ /**
+ * Gets an entry from the dictionary.
+ *
+ * @param idx index of the entry
+ * @return the entry, or null if non existent
+ */
+ byte[] getEntry(short idx);
+
+ /**
+ * Finds the index of an entry.
+ * If no entry found, we add it.
+ *
+ * @param data the byte array that we're looking up
+ * @param offset Offset into <code>data</code> to add to Dictionary.
+ * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+ * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+ */
+ short findEntry(byte[] data, int offset, int length);
+
+ /**
+ * Adds an entry to the dictionary.
+ * Be careful using this method. It will add an entry to the
+ * dictionary even if it already has an entry for the same data.
+ * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+ * dictionary entries.
+ *
+ * @param data the entry to add
+ * @param offset Offset into <code>data</code> to add to Dictionary.
+ * @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
+ * @return the index of the entry
+ */
+
+ short addEntry(byte[] data, int offset, int length);
+
+ /**
+ * Flushes the dictionary, empties all values.
+ */
+ void clear();
+}
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable. Currently has max of 2^15 entries. Will start
+ * evicting if exceeds this number The maximum memory we expect this dictionary
+ * to take in the worst case is about:
+ * <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes
+ * (these are some big names) = ~16MB</code>.
+ * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
+ */
+@InterfaceAudience.Private
+public class LRUDictionary implements Dictionary {
+
+ BidirectionalLRUMap backingStore;
+ @Override
+ public byte[] getEntry(short idx) {
+ return backingStore.get(idx);
+ }
+
+ @Override
+ public void init(int initialSize) {
+ backingStore = new BidirectionalLRUMap(initialSize);
+ }
+ @Override
+ public short findEntry(byte[] data, int offset, int length) {
+ short ret = backingStore.findIdx(data, offset, length);
+ if (ret == NOT_IN_DICTIONARY) {
+ addEntry(data, offset, length);
+ }
+ return ret;
+ }
+
+ @Override
+ public short addEntry(byte[] data, int offset, int length) {
+ if (length <= 0) return NOT_IN_DICTIONARY;
+ return backingStore.put(data, offset, length);
+ }
+
+ @Override
+ public void clear() {
+ backingStore.clear();
+ }
+
+ /*
+ * Internal class used to implement LRU eviction and dual lookup (by key and
+ * value).
+ *
+ * This is not thread safe. Don't use in multi-threaded applications.
+ */
+ static class BidirectionalLRUMap {
+ private int currSize = 0;
+
+ // Head and tail of the LRU list.
+ private Node head;
+ private Node tail;
+
+ private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>();
+ private Node[] indexToNode;
+ private int initSize = 0;
+
+ public BidirectionalLRUMap(int initialSize) {
+ initSize = initialSize;
+ indexToNode = new Node[initialSize];
+ for (int i = 0; i < initialSize; i++) {
+ indexToNode[i] = new Node();
+ }
+ }
+
+ private short put(byte[] array, int offset, int length) {
+ // We copy the bytes we want, otherwise we might be holding references to
+ // massive arrays in our dictionary (or those arrays might change)
+ byte[] stored = new byte[length];
+ Bytes.putBytes(stored, 0, array, offset, length);
+
+ if (currSize < initSize) {
+ // There is space to add without evicting.
+ indexToNode[currSize].setContents(stored, 0, stored.length);
+ setHead(indexToNode[currSize]);
+ short ret = (short) currSize++;
+ nodeToIndex.put(indexToNode[ret], ret);
+ return ret;
+ } else {
+ short s = nodeToIndex.remove(tail);
+ tail.setContents(stored, 0, stored.length);
+ // we need to rehash this.
+ nodeToIndex.put(tail, s);
+ moveToHead(tail);
+ return s;
+ }
+ }
+
+ private short findIdx(byte[] array, int offset, int length) {
+ Short s;
+ final Node comparisonNode = new Node();
+ comparisonNode.setContents(array, offset, length);
+ if ((s = nodeToIndex.get(comparisonNode)) != null) {
+ moveToHead(indexToNode[s]);
+ return s;
+ } else {
+ return -1;
+ }
+ }
+
+ private byte[] get(short idx) {
+ Preconditions.checkElementIndex(idx, currSize);
+ moveToHead(indexToNode[idx]);
+ return indexToNode[idx].container;
+ }
+
+ private void moveToHead(Node n) {
+ if (head == n) {
+ // no-op -- it's already the head.
+ return;
+ }
+ // At this point we definitely have prev, since it's not the head.
+ assert n.prev != null;
+ // Unlink prev.
+ n.prev.next = n.next;
+
+ // Unlink next
+ if (n.next != null) {
+ n.next.prev = n.prev;
+ } else {
+ assert n == tail;
+ tail = n.prev;
+ }
+ // Node is now removed from the list. Re-add it at the head.
+ setHead(n);
+ }
+
+ private void setHead(Node n) {
+ // assume it's already unlinked from the list at this point.
+ n.prev = null;
+ n.next = head;
+ if (head != null) {
+ assert head.prev == null;
+ head.prev = n;
+ }
+
+ head = n;
+
+ // First entry
+ if (tail == null) {
+ tail = n;
+ }
+ }
+
+ private void clear() {
+ currSize = 0;
+ nodeToIndex.clear();
+ tail = null;
+ head = null;
+
+ for (Node n : indexToNode) {
+ n.container = null;
+ }
+
+ for (int i = 0; i < initSize; i++) {
+ indexToNode[i].next = null;
+ indexToNode[i].prev = null;
+ }
+ }
+
+ private static class Node {
+ byte[] container;
+ int offset;
+ int length;
+ Node next; // link towards the tail
+ Node prev; // link towards the head
+
+ public Node() {
+ }
+
+ private void setContents(byte[] container, int offset, int length) {
+ this.container = container;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(container, offset, length);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Node)) {
+ return false;
+ }
+
+ Node casted = (Node) other;
+ return Bytes.equals(container, offset, length, casted.container,
+ casted.offset, casted.length);
+ }
+ }
+ }
+}
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/*
+ * It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind
+ * blanks out for a split-second and he starts the work by wrapping it in the most convoluted
+ * interface he can come up with. Custom streams that allocate memory, DataOutput that is only used
+ * to write single bytes... We operate on simple streams. Thus, we are going to have a simple
+ * implementation copy-pasted from protobuf Coded*Stream.
+ */
+@InterfaceAudience.Private
+public class StreamUtils {
+
+ public static void writeRawVInt32(OutputStream output, int value) throws IOException {
+ assert value >= 0;
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ output.write(value);
+ return;
+ } else {
+ output.write((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ }
+ }
+
+ public static int readRawVarint32(InputStream input) throws IOException {
+ byte tmp = (byte) input.read();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = (byte) input.read()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.read() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static int readRawVarint32(ByteBuffer input) throws IOException {
+ byte tmp = input.get();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = input.get()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.get() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static short toShort(byte hi, byte lo) {
+ short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+ Preconditions.checkArgument(s >= 0);
+ return s;
+ }
+
+ public static void writeShort(OutputStream out, short v) throws IOException {
+ Preconditions.checkArgument(v >= 0);
+ out.write((byte) (0xff & (v >> 8)));
+ out.write((byte) (0xff & v));
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java?rev=1532176&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java Tue Oct 15 03:56:36 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+ LRUDictionary testee;
+
+ @Before
+ public void setUp() throws Exception {
+ testee = new LRUDictionary();
+ testee.init(Short.MAX_VALUE);
+ }
+
+ @Test
+ public void TestContainsNothing() {
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ /**
+ * Assert can't add empty array.
+ */
+ @Test
+ public void testPassingEmptyArrayToFindEntry() {
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ }
+
+ @Test
+ public void testPassingSameArrayToAddEntry() {
+ // Add random predefined byte array, in this case a random byte array from
+ // HConstants. Assert that when we add, we get new index. Thats how it
+ // works.
+ int len = HConstants.CATALOG_FAMILY.length;
+ int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ }
+
+ @Test
+ public void testBasic() {
+ Random rand = new Random();
+ byte[] testBytes = new byte[10];
+ rand.nextBytes(testBytes);
+
+ // Verify that our randomly generated array doesn't exist in the dictionary
+ assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+ // now since we looked up an entry, we should have added it to the
+ // dictionary, so it isn't empty
+
+ assertFalse(isDictionaryEmpty(testee));
+
+ // Check if we can find it using findEntry
+ short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+ // Making sure we do find what we're looking for
+ assertTrue(t != -1);
+
+ byte[] testBytesCopy = new byte[20];
+
+ Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+ // copy byte arrays, make sure that we check that equal byte arrays are
+ // equal without just checking the reference
+ assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+ // make sure the entry retrieved is the same as the one put in
+ assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+ testee.clear();
+
+ // making sure clear clears the dictionary
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ @Test
+ public void TestLRUPolicy(){
+ //start by filling the dictionary up with byte arrays
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+ (BigInteger.valueOf(i)).toByteArray().length);
+ }
+
+ // check we have the first element added
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+
+ // check for an element we know isn't there
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+ // since we just checked for this element, it should be there now.
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+ // test eviction, that the least recently added or looked at element is
+ // evicted. We looked at ZERO so it should be in the dictionary still.
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+ // Now go from beyond 1 to the end.
+ for(int i = 1; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) == -1);
+ }
+
+ // check we can find all of these.
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) != -1);
+ }
+ }
+
+ static private boolean isDictionaryEmpty(LRUDictionary dict) {
+ try {
+ dict.getEntry((short)0);
+ return false;
+ } catch (IndexOutOfBoundsException ioobe) {
+ return true;
+ }
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java Tue Oct 15 03:56:36 2013
@@ -20,21 +20,31 @@ package org.apache.hadoop.hbase.regionse
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
/**
* Context that holds the various dictionaries for compression in HLog.
*/
@InterfaceAudience.Private
class CompressionContext {
+
+ static final String ENABLE_WAL_TAGS_COMPRESSION =
+ "hbase.regionserver.wal.tags.enablecompression";
+
final Dictionary regionDict;
final Dictionary tableDict;
final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
+ // Context used for compressing tags
+ TagCompressionContext tagCompressionContext = null;
- public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits)
- throws SecurityException, NoSuchMethodException, InstantiationException,
+ public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
+ Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
@@ -54,6 +64,9 @@ class CompressionContext {
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
+ if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
+ tagCompressionContext = new TagCompressionContext(dictType);
+ }
}
void clear() {
@@ -62,5 +75,8 @@ class CompressionContext {
familyDict.clear();
qualifierDict.clear();
rowDict.clear();
+ if (tagCompressionContext != null) {
+ tagCompressionContext.clear();
+ }
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Tue Oct 15 03:56:36 2013
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java Tue Oct 15 03:56:36 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -73,7 +74,7 @@ public abstract class ReaderBase impleme
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path));
+ FSUtils.isRecoveredEdits(path), conf);
} else {
compressionContext.clear();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java Tue Oct 15 03:56:36 2013
@@ -29,11 +29,12 @@ import org.apache.hadoop.hbase.codec.Bas
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
-import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -157,7 +158,8 @@ public class WALCellCodec implements Cod
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
StreamUtils.writeRawVInt32(out, kv.getValueLength());
// To support tags
- StreamUtils.writeRawVInt32(out, kv.getTagsLength());
+ short tagsLength = kv.getTagsLength();
+ StreamUtils.writeRawVInt32(out, tagsLength);
// Write row, qualifier, and family; use dictionary
// compression as they're likely to have duplicates.
@@ -165,11 +167,25 @@ public class WALCellCodec implements Cod
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
- // Write the rest uncompressed.
+ // Write timestamp, type and value as uncompressed.
int pos = kv.getTimestampOffset();
- int remainingLength = kv.getLength() + offset - pos;
- out.write(kvBuffer, pos, remainingLength);
-
+ int tsTypeValLen = kv.getLength() + offset - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ assert tsTypeValLen > 0;
+ out.write(kvBuffer, pos, tsTypeValLen);
+ if (tagsLength > 0) {
+ if (compression.tagCompressionContext != null) {
+ // Write tags using Dictionary compression
+ compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
+ tagsLength);
+ } else {
+ // Tag compression is disabled within the WAL compression. Just write the tags bytes as
+ // it is.
+ out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
+ }
+ }
}
private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
@@ -199,7 +215,7 @@ public class WALCellCodec implements Cod
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
- int tagsLength = StreamUtils.readRawVarint32(in);
+ short tagsLength = (short) StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@@ -228,8 +244,23 @@ public class WALCellCodec implements Cod
elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
pos += elemLen;
- // the rest
- IOUtils.readFully(in, backingArray, pos, length - pos);
+ // timestamp, type and value
+ int tsTypeValLen = length - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
+ pos += tsTypeValLen;
+
+ // tags
+ if (tagsLength > 0) {
+ pos = Bytes.putShort(backingArray, pos, tagsLength);
+ if (compression.tagCompressionContext != null) {
+ compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
+ } else {
+ IOUtils.readFully(in, backingArray, pos, tagsLength);
+ }
+ }
return new KeyValue(backingArray, 0, length);
}
@@ -294,80 +325,4 @@ public class WALCellCodec implements Cod
// TODO: ideally this should also encapsulate compressionContext
return this.statelessUncompressor;
}
-
- /**
- * It seems like as soon as somebody sets himself to the task of creating VInt encoding,
- * his mind blanks out for a split-second and he starts the work by wrapping it in the
- * most convoluted interface he can come up with. Custom streams that allocate memory,
- * DataOutput that is only used to write single bytes... We operate on simple streams.
- * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
- */
- private static class StreamUtils {
- public static int computeRawVarint32Size(final int value) {
- if ((value & (0xffffffff << 7)) == 0) return 1;
- if ((value & (0xffffffff << 14)) == 0) return 2;
- if ((value & (0xffffffff << 21)) == 0) return 3;
- if ((value & (0xffffffff << 28)) == 0) return 4;
- return 5;
- }
-
- static void writeRawVInt32(OutputStream output, int value) throws IOException {
- assert value >= 0;
- while (true) {
- if ((value & ~0x7F) == 0) {
- output.write(value);
- return;
- } else {
- output.write((value & 0x7F) | 0x80);
- value >>>= 7;
- }
- }
- }
-
- static int readRawVarint32(InputStream input) throws IOException {
- byte tmp = (byte)input.read();
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = (byte)input.read()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (input.read() >= 0) {
- return result;
- }
- }
- throw new IOException("Malformed varint");
- }
- }
- }
- }
- return result;
- }
-
- static short toShort(byte hi, byte lo) {
- short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
- Preconditions.checkArgument(s >= 0);
- return s;
- }
-
- static void writeShort(OutputStream out, short v) throws IOException {
- Preconditions.checkArgument(v >= 0);
- out.write((byte)(0xff & (v >> 8)));
- out.write((byte)(0xff & v));
- }
- }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java Tue Oct 15 03:56:36 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@@ -40,7 +41,7 @@ public abstract class WriterBase impleme
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path));
+ FSUtils.isRecoveredEdits(path), conf);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java Tue Oct 15 03:56:36 2013
@@ -28,6 +28,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java Tue Oct 15 03:56:36 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
@@ -67,7 +68,7 @@ public class TestKeyValueCompression {
}
private void runTestCycle(List<KeyValue> kvs) throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
@@ -84,7 +85,7 @@ public class TestKeyValueCompression {
@Test
public void testKVWithTags() throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java?rev=1532176&r1=1532175&r2=1532176&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java Tue Oct 15 03:56:36 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -42,8 +43,19 @@ public class TestWALCellCodecWithCompres
@Test
public void testEncodeDecodeKVsWithTags() throws Exception {
- WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext(
- LRUDictionary.class, false));
+ doTest(false);
+ }
+
+ @Test
+ public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
+ doTest(true);
+ }
+
+ private void doTest(boolean compressTags) throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
+ WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
+ conf));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));