You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/01 11:51:37 UTC
svn commit: r1573150 - in /hbase/branches/0.98:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/
hbase-common/src/test/java/org/ap...
Author: anoopsamjohn
Date: Sat Mar 1 10:51:36 2014
New Revision: 1573150
URL: http://svn.apache.org/r1573150
Log:
HBASE-10451 Enable back Tag compression on HFiles.(Anoop)
Added:
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
Modified:
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Sat Mar 1 10:51:36 2014
@@ -195,9 +195,8 @@ public class HColumnDescriptor implement
/**
* Default compress tags along with any type of DataBlockEncoding.
- * Disabled to false by default in 0.98.0
*/
- public static final boolean DEFAULT_COMPRESS_TAGS = false;
+ public static final boolean DEFAULT_COMPRESS_TAGS = true;
private final static Map<String, String> DEFAULT_VALUES
= new HashMap<String, String>();
Modified: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java (original)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java Sat Mar 1 10:51:36 2014
@@ -41,12 +41,12 @@ import org.apache.hadoop.io.IOUtils;
public class TagCompressionContext {
private final Dictionary tagDict;
- public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
- NoSuchMethodException, InstantiationException, IllegalAccessException,
- InvocationTargetException {
+ public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
+ throws SecurityException, NoSuchMethodException, InstantiationException,
+ IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
tagDict = dictConstructor.newInstance();
- tagDict.init(Short.MAX_VALUE);
+ tagDict.init(dictCapacity);
}
public void clear() {
@@ -131,10 +131,12 @@ public class TagCompressionContext {
* @param dest Destination array where to write the uncompressed tags
* @param offset Offset in destination where tags to be written
* @param length Length of all tag bytes
+ * @return bytes count read from source to uncompress all tags.
* @throws IOException
*/
- public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
+ public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
throws IOException {
+ int srcBeginPos = src.position();
int endOffset = offset + length;
while (offset < endOffset) {
byte status = src.get();
@@ -158,6 +160,7 @@ public class TagCompressionContext {
offset += tagLen;
}
}
+ return src.position() - srcBeginPos;
}
/**
Modified: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Sat Mar 1 10:51:36 2014
@@ -52,12 +52,20 @@ abstract class BufferedDataBlockEncoder
HFileBlockDefaultDecodingContext decodingCtx =
(HFileBlockDefaultDecodingContext) blkDecodingCtx;
- if (decodingCtx.getHFileContext().isCompressTags()) {
- try {
- TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
- decodingCtx.setTagCompressionContext(tagCompressionContext);
- } catch (Exception e) {
- throw new IOException("Failed to initialize TagCompressionContext", e);
+ if (decodingCtx.getHFileContext().isIncludesTags()
+ && decodingCtx.getHFileContext().isCompressTags()) {
+ if (decodingCtx.getTagCompressionContext() != null) {
+ // It will be overhead to create the TagCompressionContext again and again for every block
+ // decoding.
+ decodingCtx.getTagCompressionContext().clear();
+ } else {
+ try {
+ TagCompressionContext tagCompressionContext = new TagCompressionContext(
+ LRUDictionary.class, Byte.MAX_VALUE);
+ decodingCtx.setTagCompressionContext(tagCompressionContext);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize TagCompressionContext", e);
+ }
}
}
return internalDecodeKeyValues(source, 0, 0, decodingCtx);
@@ -70,6 +78,8 @@ abstract class BufferedDataBlockEncoder
protected int lastCommonPrefix;
protected int tagsLength = 0;
protected int tagsOffset = -1;
+ protected int tagsCompressedLength = 0;
+ protected boolean uncompressTags = true;
/** We need to store a copy of the key. */
protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
@@ -84,6 +94,8 @@ abstract class BufferedDataBlockEncoder
protected void invalidate() {
valueOffset = -1;
+ tagsCompressedLength = 0;
+ uncompressTags = true;
}
protected void ensureSpaceForKey() {
@@ -160,7 +172,7 @@ abstract class BufferedDataBlockEncoder
this.decodingCtx = decodingCtx;
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
- tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize TagCompressionContext", e);
}
@@ -249,6 +261,9 @@ abstract class BufferedDataBlockEncoder
@Override
public void rewind() {
currentBuffer.rewind();
+ if (tagCompressionContext != null) {
+ tagCompressionContext.clear();
+ }
decodeFirst();
previous.invalidate();
}
@@ -266,13 +281,18 @@ abstract class BufferedDataBlockEncoder
protected void decodeTags() {
current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
if (tagCompressionContext != null) {
- // Tag compression is been used. uncompress it into tagsBuffer
- current.ensureSpaceForTags();
- try {
- tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
- current.tagsLength);
- } catch (IOException e) {
- throw new RuntimeException("Exception while uncompressing tags", e);
+ if (current.uncompressTags) {
+ // Tag compression is been used. uncompress it into tagsBuffer
+ current.ensureSpaceForTags();
+ try {
+ current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
+ current.tagsBuffer, 0, current.tagsLength);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception while uncompressing tags", e);
+ }
+ } else {
+ ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
+ current.uncompressTags = true;// Reset this.
}
current.tagsOffset = -1;
} else {
@@ -355,7 +375,15 @@ abstract class BufferedDataBlockEncoder
// move after last key value
currentBuffer.position(current.nextKvOffset);
-
+ // Already decoded the tag bytes. We cache this tags into current state and also the total
+ // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
+ // the tags again. This might pollute the Data Dictionary what we use for the compression.
+ // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
+ // 'tagsCompressedLength' bytes of source stream.
+ // See in decodeTags()
+ current.tagsBuffer = previous.tagsBuffer;
+ current.tagsCompressedLength = previous.tagsCompressedLength;
+ current.uncompressTags = false;
previous.invalidate();
}
@@ -468,12 +496,20 @@ abstract class BufferedDataBlockEncoder
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding();
DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
- if (encodingCtx.getHFileContext().isCompressTags()) {
- try {
- TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
- encodingCtx.setTagCompressionContext(tagCompressionContext);
- } catch (Exception e) {
- throw new IOException("Failed to initialize TagCompressionContext", e);
+ if (encodingCtx.getHFileContext().isIncludesTags()
+ && encodingCtx.getHFileContext().isCompressTags()) {
+ if (encodingCtx.getTagCompressionContext() != null) {
+ // It will be overhead to create the TagCompressionContext again and again for every block
+ // encoding.
+ encodingCtx.getTagCompressionContext().clear();
+ } else {
+ try {
+ TagCompressionContext tagCompressionContext = new TagCompressionContext(
+ LRUDictionary.class, Byte.MAX_VALUE);
+ encodingCtx.setTagCompressionContext(tagCompressionContext);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize TagCompressionContext", e);
+ }
}
}
internalEncodeKeyValues(dataOut, in, encodingCtx);
Modified: hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java (original)
+++ hbase/branches/0.98/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java Sat Mar 1 10:51:36 2014
@@ -45,7 +45,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTags1() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(2);
short tagsLength1 = kv1.getTagsLength();
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
@@ -71,7 +71,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTags2() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(1);
short tagsLength1 = kv1.getTagsLength();
context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Mar 1 10:51:36 2014
@@ -891,15 +891,11 @@ public class HStore implements Store {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
- if(family.shouldCompressTags()) {
- LOG.warn("HFile tag compression attribute ignored for '" + family.getNameAsString()
- + "', see HBASE-10443.");
- }
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag)
.withCompression(compression)
- .withCompressTags(false)
+ .withCompressTags(family.shouldCompressTags())
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
.withBlockSize(blocksize)
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java Sat Mar 1 10:51:36 2014
@@ -64,7 +64,7 @@ class CompressionContext {
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
if (hasTagCompression) {
- tagCompressionContext = new TagCompressionContext(dictType);
+ tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
}
}
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1573150&r1=1573149&r2=1573150&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Sat Mar 1 10:51:36 2014
@@ -70,7 +70,6 @@ public class TestEncodedSeekers {
private final HBaseTestingUtility testUtil = HBaseTestingUtility.createLocalHTU();
private final DataBlockEncoding encoding;
- private final boolean encodeOnDisk;
private final boolean includeTags;
private final boolean compressTags;
@@ -82,28 +81,24 @@ public class TestEncodedSeekers {
List<Object[]> paramList = new ArrayList<Object[]>();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
for (boolean includeTags : new boolean[] { false, true }) {
- for (boolean encodeOnDisk : new boolean[] { false, true }) {
- for (boolean compressTags : new boolean[] { false, true }) {
- paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags });
- }
+ for (boolean compressTags : new boolean[] { false, true }) {
+ paramList.add(new Object[] { encoding, includeTags, compressTags });
}
}
}
return paramList;
}
- public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
- boolean compressTags) {
+ public TestEncodedSeekers(DataBlockEncoding encoding, boolean includeTags, boolean compressTags) {
this.encoding = encoding;
- this.encodeOnDisk = encodeOnDisk;
this.includeTags = includeTags;
this.compressTags = compressTags;
}
@Test
public void testEncodedSeeker() throws IOException {
- System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : "
- + encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
+ System.err.println("Testing encoded seekers for encoding : " + encoding + ", includeTags : "
+ + includeTags + ", compressTags : " + compressTags);
if(includeTags) {
testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
}
Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java?rev=1573150&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java Sat Mar 1 10:51:36 2014
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestStoreFileScannerWithTagCompression {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Configuration conf = TEST_UTIL.getConfiguration();
+ private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
+ private static String ROOT_DIR = TEST_UTIL.getDataTestDir(
+ "TestStoreFileScannerWithTagCompression").toString();
+ private static FileSystem fs = null;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ conf.setInt("hfile.format.version", 3);
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testReseek() throws Exception {
+ // write the file
+ Path f = new Path(ROOT_DIR, "testReseek");
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true)
+ .withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build();
+ // Make a store file and write data to it.
+ StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs).withFilePath(f)
+ .withFileContext(meta).build();
+
+ writeStoreFile(writer);
+ writer.close();
+
+ StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
+ StoreFileScanner s = reader.getStoreFileScanner(false, false);
+ try {
+ // Now do reseek with empty KV to position to the beginning of the file
+ KeyValue k = KeyValue.createFirstOnRow(Bytes.toBytes("k2"));
+ s.reseek(k);
+ KeyValue kv = s.next();
+ kv = s.next();
+ kv = s.next();
+ byte[] key5 = Bytes.toBytes("k5");
+ assertTrue(Bytes.equals(key5, 0, key5.length, kv.getRowArray(), kv.getRowOffset(),
+ kv.getRowLength()));
+ List<Tag> tags = kv.getTags();
+ assertEquals(1, tags.size());
+ assertEquals("tag3", Bytes.toString(tags.get(0).getValue()));
+ } finally {
+ s.close();
+ }
+ }
+
+ private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
+ byte[] fam = Bytes.toBytes("f");
+ byte[] qualifier = Bytes.toBytes("q");
+ long now = System.currentTimeMillis();
+ byte[] b = Bytes.toBytes("k1");
+ Tag t1 = new Tag((byte) 1, "tag1");
+ Tag t2 = new Tag((byte) 2, "tag2");
+ Tag t3 = new Tag((byte) 3, "tag3");
+ try {
+ writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t1 }));
+ b = Bytes.toBytes("k3");
+ writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t2, t1 }));
+ b = Bytes.toBytes("k4");
+ writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
+ b = Bytes.toBytes("k5");
+ writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
+ } finally {
+ writer.close();
+ }
+ }
+}