You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2013/10/31 05:59:01 UTC
svn commit: r1537377 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/
hbase-common/src/main/java/org/apache/had...
Author: anoopsamjohn
Date: Thu Oct 31 04:59:00 2013
New Revision: 1537377
URL: http://svn.apache.org/r1537377
Log:
HBASE-9045 Support Dictionary based Tag compression in HFiles
Added:
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Thu Oct 31 04:59:00 2013
@@ -94,6 +94,7 @@ public class HColumnDescriptor implement
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
public static final String MIN_VERSIONS = "MIN_VERSIONS";
public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
+ public static final String COMPRESS_TAGS = "COMPRESS_TAGS";
/**
* Default compression type.
@@ -187,6 +188,11 @@ public class HColumnDescriptor implement
*/
public static final boolean DEFAULT_EVICT_BLOCKS_ON_CLOSE = false;
+ /**
+ * Default compress tags along with any type of DataBlockEncoding
+ */
+ public static final boolean DEFAULT_COMPRESS_TAGS = true;
+
private final static Map<String, String> DEFAULT_VALUES
= new HashMap<String, String>();
private final static Set<ImmutableBytesWritable> RESERVED_KEYWORDS
@@ -675,6 +681,30 @@ public class HColumnDescriptor implement
}
/**
+ * Set whether the tags should be compressed along with DataBlockEncoding. When no
+ * DataBlockEncoding is been used, this is having no effect.
+ *
+ * @param compressTags
+ * @return this (for chained invocation)
+ */
+ public HColumnDescriptor setCompressTags(boolean compressTags) {
+ return setValue(COMPRESS_TAGS, String.valueOf(compressTags));
+ }
+
+ /**
+ * @return Whether KV tags should be compressed along with DataBlockEncoding. When no
+ * DataBlockEncoding is been used, this is having no effect.
+ */
+ public boolean shouldCompressTags() {
+ String compressTagsStr = getValue(COMPRESS_TAGS);
+ boolean compressTags = DEFAULT_COMPRESS_TAGS;
+ if (compressTagsStr != null) {
+ compressTags = Boolean.valueOf(compressTagsStr);
+ }
+ return compressTags;
+ }
+
+ /**
* @return Compression type setting.
*/
public Compression.Algorithm getCompactionCompressionType() {
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java Thu Oct 31 04:59:00 2013
@@ -23,17 +23,19 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
/**
* Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
- * will be used for compressing tags while writing into WALs.
+ * will be used for compressing tags while writing into HFiles and WALs.
*/
@InterfaceAudience.Private
public class TagCompressionContext {
@@ -52,7 +54,7 @@ public class TagCompressionContext {
}
/**
- * Compress tags one by one and writes the OutputStream.
+ * Compress tags one by one and writes to the OutputStream.
* @param out Stream to which the compressed tags to be written
* @param in Source where tags are available
* @param offset Offset for the tags bytes
@@ -73,6 +75,24 @@ public class TagCompressionContext {
}
/**
+ * Compress tags one by one and writes to the OutputStream.
+ * @param out Stream to which the compressed tags to be written
+ * @param in Source buffer where tags are available
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, ByteBuffer in, short length) throws IOException {
+ if (in.hasArray()) {
+ compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
+ ByteBufferUtils.skip(in, length);
+ } else {
+ byte[] tagBuf = new byte[length];
+ in.get(tagBuf);
+ compressTags(out, tagBuf, 0, length);
+ }
+ }
+
+ /**
* Uncompress tags from the InputStream and writes to the destination array.
* @param src Stream where the compressed tags are available
* @param dest Destination array where to write the uncompressed tags
@@ -105,6 +125,58 @@ public class TagCompressionContext {
}
}
+ /**
+ * Uncompress tags from the input ByteBuffer and writes to the destination array.
+ * @param src Buffer where the compressed tags are available
+ * @param dest Destination array where to write the uncompressed tags
+ * @param offset Offset in destination where tags to be written
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
+ throws IOException {
+ int endOffset = offset + length;
+ while (offset < endOffset) {
+ byte status = src.get();
+ short tagLen;
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // We are writing short as tagLen. So can downcast this without any risk.
+ tagLen = (short) StreamUtils.readRawVarint32(src);
+ offset = Bytes.putShort(dest, offset, tagLen);
+ src.get(dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen);
+ offset += tagLen;
+ } else {
+ short dictIdx = StreamUtils.toShort(status, src.get());
+ byte[] entry = tagDict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ tagLen = (short) entry.length;
+ offset = Bytes.putShort(dest, offset, tagLen);
+ System.arraycopy(entry, 0, dest, offset, tagLen);
+ offset += tagLen;
+ }
+ }
+ }
+
+ /**
+ * Uncompress tags from the InputStream and writes to the destination buffer.
+ * @param src Stream where the compressed tags are available
+ * @param dest Destination buffer where to write the uncompressed tags
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void uncompressTags(InputStream src, ByteBuffer dest, short length) throws IOException {
+ if (dest.hasArray()) {
+ uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
+ } else {
+ byte[] tagBuf = new byte[length];
+ uncompressTags(src, tagBuf, 0, length);
+ dest.put(tagBuf);
+ }
+ }
+
private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
if (tagDict != null) {
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Thu Oct 31 04:59:00 2013
@@ -26,8 +26,10 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
@@ -50,6 +52,14 @@ abstract class BufferedDataBlockEncoder
HFileBlockDefaultDecodingContext decodingCtx =
(HFileBlockDefaultDecodingContext) blkDecodingCtx;
+ if (decodingCtx.getHFileContext().shouldCompressTags()) {
+ try {
+ TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ decodingCtx.setTagCompressionContext(tagCompressionContext);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize TagCompressionContext", e);
+ }
+ }
return internalDecodeKeyValues(source, 0, 0, decodingCtx);
}
@@ -58,11 +68,12 @@ abstract class BufferedDataBlockEncoder
protected int keyLength;
protected int valueLength;
protected int lastCommonPrefix;
- protected int tagLength = 0;
- protected int tagOffset = -1;
+ protected int tagsLength = 0;
+ protected int tagsOffset = -1;
/** We need to store a copy of the key. */
protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
+ protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
protected long memstoreTS;
protected int nextKvOffset;
@@ -88,6 +99,19 @@ abstract class BufferedDataBlockEncoder
}
}
+ protected void ensureSpaceForTags() {
+ if (tagsLength > tagsBuffer.length) {
+ // rare case, but we need to handle arbitrary length of tags
+ int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
+ while (tagsLength > newTagsBufferLength) {
+ newTagsBufferLength *= 2;
+ }
+ byte[] newTagsBuffer = new byte[newTagsBufferLength];
+ System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
+ tagsBuffer = newTagsBuffer;
+ }
+ }
+
/**
* Copy the state from the next one into this instance (the previous state
* placeholder). Used to save the previous state when we are advancing the
@@ -127,6 +151,7 @@ abstract class BufferedDataBlockEncoder
protected ByteBuffer currentBuffer;
protected STATE current = createSeekerState(); // always valid
protected STATE previous = createSeekerState(); // may not be valid
+ protected TagCompressionContext tagCompressionContext = null;
public BufferedEncodedSeeker(KVComparator comparator,
HFileBlockDecodingContext decodingCtx) {
@@ -137,6 +162,13 @@ abstract class BufferedDataBlockEncoder
this.samePrefixComparator = null;
}
this.decodingCtx = decodingCtx;
+ if (decodingCtx.getHFileContext().shouldCompressTags()) {
+ try {
+ tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize TagCompressionContext", e);
+ }
+ }
}
protected boolean includesMvcc() {
@@ -183,17 +215,25 @@ abstract class BufferedDataBlockEncoder
kvBuffer.put(currentBuffer.array(),
currentBuffer.arrayOffset() + current.valueOffset,
current.valueLength);
- if (current.tagLength > 0) {
- kvBuffer.putShort((short) current.tagLength);
- kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagOffset,
- current.tagLength);
+ if (current.tagsLength > 0) {
+ kvBuffer.putShort((short) current.tagsLength);
+ if (current.tagsOffset != -1) {
+ // the offset of the tags bytes in the underlying buffer is marked. So the temp
+ // buffer,tagsBuffer was not been used.
+ kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
+ current.tagsLength);
+ } else {
+ // When tagsOffset is marked as -1, tag compression was present and so the tags were
+ // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
+ kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
+ }
}
return kvBuffer;
}
protected ByteBuffer createKVBuffer() {
int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
- current.valueLength, current.tagLength);
+ current.valueLength, current.tagsLength);
ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
return kvBuffer;
}
@@ -225,9 +265,23 @@ abstract class BufferedDataBlockEncoder
}
public void decodeTags() {
- current.tagLength = ByteBufferUtils.readCompressedInt(currentBuffer);
- current.tagOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.tagLength);
+ current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ if (tagCompressionContext != null) {
+ // Tag compression is been used. uncompress it into tagsBuffer
+ current.ensureSpaceForTags();
+ try {
+ tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
+ current.tagsLength);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception while uncompressing tags", e);
+ }
+ current.tagsOffset = -1;
+ } else {
+ // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
+ // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
+ current.tagsOffset = currentBuffer.position();
+ ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+ }
}
@Override
@@ -320,9 +374,19 @@ abstract class BufferedDataBlockEncoder
protected final void afterEncodingKeyValue(ByteBuffer in,
DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
if (encodingCtx.getHFileContext().shouldIncludeTags()) {
- int tagsLength = in.getShort();
+ short tagsLength = in.getShort();
ByteBufferUtils.putCompressedInt(out, tagsLength);
- ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+ // There are some tags to be written
+ if (tagsLength > 0) {
+ TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
+ // When tag compression is enabled, tagCompressionContext will have a not null value. Write
+ // the tags using Dictionary compression in such a case
+ if (tagCompressionContext != null) {
+ tagCompressionContext.compressTags(out, in, tagsLength);
+ } else {
+ ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+ }
+ }
}
if (encodingCtx.getHFileContext().shouldIncludeMvcc()) {
// Copy memstore timestamp from the byte buffer to the output stream.
@@ -340,9 +404,18 @@ abstract class BufferedDataBlockEncoder
protected final void afterDecodingKeyValue(DataInputStream source,
ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
if (decodingCtx.getHFileContext().shouldIncludeTags()) {
- int tagsLength = ByteBufferUtils.readCompressedInt(source);
- dest.putShort((short)tagsLength);
- ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
+ short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
+ dest.putShort(tagsLength);
+ if (tagsLength > 0) {
+ TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
+ // When tag compression is been used in this file, tagCompressionContext will have a not
+ // null value passed.
+ if (tagCompressionContext != null) {
+ tagCompressionContext.uncompressTags(source, dest, tagsLength);
+ } else {
+ ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
+ }
+ }
}
if (decodingCtx.getHFileContext().shouldIncludeMvcc()) {
long memstoreTS = -1;
@@ -398,6 +471,14 @@ abstract class BufferedDataBlockEncoder
DataOutputStream dataOut =
((HFileBlockDefaultEncodingContext) encodingCtx)
.getOutputStreamForEncoder();
+ if (encodingCtx.getHFileContext().shouldCompressTags()) {
+ try {
+ TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ encodingCtx.setTagCompressionContext(tagCompressionContext);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize TagCompressionContext", e);
+ }
+ }
internalEncodeKeyValues(dataOut, in, encodingCtx);
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java Thu Oct 31 04:59:00 2013
@@ -67,8 +67,8 @@ public class CopyKeyDataBlockEncoder ext
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
if (includesTags()) {
- current.tagLength = currentBuffer.getShort();
- ByteBufferUtils.skip(currentBuffer, current.tagLength);
+ current.tagsLength = currentBuffer.getShort();
+ ByteBufferUtils.skip(currentBuffer, current.tagsLength);
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java Thu Oct 31 04:59:00 2013
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -38,6 +39,7 @@ public class HFileBlockDefaultDecodingCo
HFileBlockDecodingContext {
private final HFileContext fileContext;
+ private TagCompressionContext tagCompressionContext;
public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
this.fileContext = fileContext;
@@ -58,4 +60,12 @@ public class HFileBlockDefaultDecodingCo
public HFileContext getHFileContext() {
return this.fileContext;
}
+
+ public TagCompressionContext getTagCompressionContext() {
+ return tagCompressionContext;
+ }
+
+ public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
+ this.tagCompressionContext = tagCompressionContext;
+ }
}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java Thu Oct 31 04:59:00 2013
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -62,6 +63,7 @@ public class HFileBlockDefaultEncodingCo
private byte[] dummyHeader;
private HFileContext fileContext;
+ private TagCompressionContext tagCompressionContext;
/**
* @param encoding encoding used
@@ -193,4 +195,12 @@ public class HFileBlockDefaultEncodingCo
public HFileContext getHFileContext() {
return this.fileContext;
}
+
+ public TagCompressionContext getTagCompressionContext() {
+ return tagCompressionContext;
+ }
+
+ public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
+ this.tagCompressionContext = tagCompressionContext;
+ }
}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java Thu Oct 31 04:59:00 2013
@@ -122,6 +122,10 @@ public class HFileContext implements Hea
return compressTags;
}
+ public void setCompressTags(boolean compressTags) {
+ this.compressTags = compressTags;
+ }
+
public ChecksumType getChecksumType() {
return checksumType;
}
Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java?rev=1537377&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java Thu Oct 31 04:59:00 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestTagCompressionContext {
+
+ private static final byte[] ROW = Bytes.toBytes("r1");
+ private static final byte[] CF = Bytes.toBytes("f");
+ private static final byte[] Q = Bytes.toBytes("q");
+ private static final byte[] V = Bytes.toBytes("v");
+
+ @Test
+ public void testCompressUncompressTags1() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+ KeyValue kv1 = createKVWithTags(2);
+ short tagsLength1 = kv1.getTagsLength();
+ ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
+ context.compressTags(baos, ib, tagsLength1);
+ KeyValue kv2 = createKVWithTags(3);
+ short tagsLength2 = kv2.getTagsLength();
+ ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
+ context.compressTags(baos, ib, tagsLength2);
+
+ context.clear();
+
+ byte[] dest = new byte[tagsLength1];
+ ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
+ context.uncompressTags(ob, dest, 0, tagsLength1);
+ assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+ tagsLength1));
+ dest = new byte[tagsLength2];
+ context.uncompressTags(ob, dest, 0, tagsLength2);
+ assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+ tagsLength2));
+ }
+
+ @Test
+ public void testCompressUncompressTags2() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+ KeyValue kv1 = createKVWithTags(1);
+ short tagsLength1 = kv1.getTagsLength();
+ context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
+ KeyValue kv2 = createKVWithTags(3);
+ short tagsLength2 = kv2.getTagsLength();
+ context.compressTags(baos, kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
+
+ context.clear();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ byte[] dest = new byte[tagsLength1];
+ context.uncompressTags(bais, dest, 0, tagsLength1);
+ assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+ tagsLength1));
+ dest = new byte[tagsLength2];
+ context.uncompressTags(bais, dest, 0, tagsLength2);
+ assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+ tagsLength2));
+ }
+
+ private KeyValue createKVWithTags(int noOfTags) {
+ List<Tag> tags = new ArrayList<Tag>();
+ for (int i = 0; i < noOfTags; i++) {
+ tags.add(new Tag((byte) i, "tagValue" + i));
+ }
+ KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
+ return kv;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Oct 31 04:59:00 2013
@@ -620,6 +620,7 @@ public class HFile {
static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
+ static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java Thu Oct 31 04:59:00 2013
@@ -64,6 +64,10 @@ public class HFileReaderV3 extends HFile
// max tag length is not present in the HFile means tags were not at all written to file.
if (tmp != null) {
hfileContext.setIncludesTags(true);
+ tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
+ if (tmp != null && Bytes.toBoolean(tmp)) {
+ hfileContext.setCompressTags(true);
+ }
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java Thu Oct 31 04:59:00 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -178,6 +179,9 @@ public class HFileWriterV3 extends HFile
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
// from the FileInfo
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+ boolean tagsCompressed = (hFileContext.getEncodingOnDisk() != DataBlockEncoding.NONE)
+ && hFileContext.shouldCompressTags();
+ fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Oct 31 04:59:00 2013
@@ -828,6 +828,7 @@ public class HStore implements Store {
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag)
+ .withCompressTags(family.shouldCompressTags())
.withCompressionAlgo(compression)
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Thu Oct 31 04:59:00 2013
@@ -71,6 +71,7 @@ public class TestEncodedSeekers {
private final DataBlockEncoding encoding;
private final boolean encodeOnDisk;
private final boolean includeTags;
+ private final boolean compressTags;
/** Enable when debugging */
private static final boolean VERBOSE = false;
@@ -81,22 +82,27 @@ public class TestEncodedSeekers {
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
for (boolean includeTags : new boolean[] { false, true }) {
for (boolean encodeOnDisk : new boolean[] { false, true }) {
- paramList.add(new Object[] { encoding, encodeOnDisk, includeTags });
+ for (boolean compressTags : new boolean[] { false, true }) {
+ paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags });
+ }
}
}
}
return paramList;
}
- public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags) {
+ public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
+ boolean compressTags) {
this.encoding = encoding;
this.encodeOnDisk = encodeOnDisk;
this.includeTags = includeTags;
+ this.compressTags = compressTags;
}
@Test
public void testEncodedSeeker() throws IOException {
- System.err.println("Testing encoded seekers for encoding " + encoding);
+ System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : "
+ + encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
if(includeTags) {
testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
}
@@ -108,7 +114,8 @@ public class TestEncodedSeekers {
setDataBlockEncoding(encoding).
setEncodeOnDisk(encodeOnDisk).
setBlocksize(BLOCK_SIZE).
- setBloomFilterType(BloomType.NONE);
+ setBloomFilterType(BloomType.NONE).
+ setCompressTags(compressTags);
HRegion region = testUtil.createTestRegion(TABLE_NAME, hcd);
//write the data, but leave some in the memstore