You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/11/29 16:58:32 UTC
hbase git commit: HBASE-17012 Handle Offheap cells in
CompressedKvEncoder (Ram)
Repository: hbase
Updated Branches:
refs/heads/master e5dad24a9 -> 7c43a23c0
HBASE-17012 Handle Offheap cells in CompressedKvEncoder (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7c43a23c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7c43a23c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7c43a23c
Branch: refs/heads/master
Commit: 7c43a23c07d2af1c236b3153ba932234c3a80d13
Parents: e5dad24
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Nov 29 22:27:45 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Tue Nov 29 22:27:45 2016 +0530
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/CellUtil.java | 58 ++++++++++++++++----
.../hadoop/hbase/io/TagCompressionContext.java | 32 +----------
.../apache/hadoop/hbase/io/util/Dictionary.java | 51 +++++++++++++++++
.../regionserver/wal/SecureWALCellCodec.java | 35 +++++++-----
.../hbase/regionserver/wal/WALCellCodec.java | 38 +++----------
.../apache/hadoop/hbase/ipc/TestRpcServer.java | 1 -
.../wal/TestWALCellCodecWithCompression.java | 40 ++++++++++++--
.../hbase/wal/TestWALReaderOnSecureWAL.java | 31 +++++++++--
8 files changed, 189 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index d47cdab..86c7720 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience.Private;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1578,12 +1579,12 @@ public final class CellUtil {
/**
* Writes the row from the given cell to the output stream
- * @param out The dataoutputstream to which the data has to be written
+ * @param out The outputstream to which the data has to be written
* @param cell The cell whose contents has to be written
* @param rlength the row length
* @throws IOException
*/
- public static void writeRow(DataOutputStream out, Cell cell, short rlength) throws IOException {
+ public static void writeRow(OutputStream out, Cell cell, short rlength) throws IOException {
if (cell instanceof ByteBufferCell) {
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
((ByteBufferCell) cell).getRowPosition(), rlength);
@@ -1611,12 +1612,12 @@ public final class CellUtil {
/**
* Writes the family from the given cell to the output stream
- * @param out The dataoutputstream to which the data has to be written
+ * @param out The outputstream to which the data has to be written
* @param cell The cell whose contents has to be written
* @param flength the family length
* @throws IOException
*/
- public static void writeFamily(DataOutputStream out, Cell cell, byte flength) throws IOException {
+ public static void writeFamily(OutputStream out, Cell cell, byte flength) throws IOException {
if (cell instanceof ByteBufferCell) {
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
((ByteBufferCell) cell).getFamilyPosition(), flength);
@@ -1627,12 +1628,12 @@ public final class CellUtil {
/**
* Writes the qualifier from the given cell to the output stream
- * @param out The dataoutputstream to which the data has to be written
+ * @param out The outputstream to which the data has to be written
* @param cell The cell whose contents has to be written
* @param qlength the qualifier length
* @throws IOException
*/
- public static void writeQualifier(DataOutputStream out, Cell cell, int qlength)
+ public static void writeQualifier(OutputStream out, Cell cell, int qlength)
throws IOException {
if (cell instanceof ByteBufferCell) {
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
@@ -1662,12 +1663,12 @@ public final class CellUtil {
/**
* Writes the value from the given cell to the output stream
- * @param out The dataoutputstream to which the data has to be written
+ * @param out The outputstream to which the data has to be written
* @param cell The cell whose contents has to be written
* @param vlength the value length
* @throws IOException
*/
- public static void writeValue(DataOutputStream out, Cell cell, int vlength) throws IOException {
+ public static void writeValue(OutputStream out, Cell cell, int vlength) throws IOException {
if (cell instanceof ByteBufferCell) {
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getValueByteBuffer(),
((ByteBufferCell) cell).getValuePosition(), vlength);
@@ -1678,12 +1679,12 @@ public final class CellUtil {
/**
* Writes the tag from the given cell to the output stream
- * @param out The dataoutputstream to which the data has to be written
+ * @param out The outputstream to which the data has to be written
* @param cell The cell whose contents has to be written
* @param tagsLength the tag length
* @throws IOException
*/
- public static void writeTags(DataOutputStream out, Cell cell, int tagsLength) throws IOException {
+ public static void writeTags(OutputStream out, Cell cell, int tagsLength) throws IOException {
if (cell instanceof ByteBufferCell) {
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
((ByteBufferCell) cell).getTagsPosition(), tagsLength);
@@ -2230,7 +2231,8 @@ public final class CellUtil {
* @param tagCompressionContext the TagCompressionContext
* @throws IOException can throw IOException if the compression encounters issue
*/
- public static void compressTags(DataOutputStream out, Cell cell,
+ @InterfaceAudience.Private
+ public static void compressTags(OutputStream out, Cell cell,
TagCompressionContext tagCompressionContext) throws IOException {
if (cell instanceof ByteBufferCell) {
tagCompressionContext.compressTags(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
@@ -2242,6 +2244,40 @@ public final class CellUtil {
}
@InterfaceAudience.Private
+ public static void compressRow(OutputStream out, Cell cell, Dictionary dict) throws IOException {
+ if (cell instanceof ByteBufferCell) {
+ Dictionary.write(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+ ((ByteBufferCell) cell).getRowPosition(), cell.getRowLength(), dict);
+ } else {
+ Dictionary.write(out, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), dict);
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static void compressFamily(OutputStream out, Cell cell, Dictionary dict)
+ throws IOException {
+ if (cell instanceof ByteBufferCell) {
+ Dictionary.write(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+ ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength(), dict);
+ } else {
+ Dictionary.write(out, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ dict);
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static void compressQualifier(OutputStream out, Cell cell, Dictionary dict)
+ throws IOException {
+ if (cell instanceof ByteBufferCell) {
+ Dictionary.write(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+ ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength(), dict);
+ } else {
+ Dictionary.write(out, cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength(), dict);
+ }
+ }
+
+ @InterfaceAudience.Private
/**
* These cells are used in reseeks/seeks to improve the read performance.
* They are not real cells that are returned back to the clients
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 278dfc4..fea2f0c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -70,7 +70,7 @@ public class TagCompressionContext {
while (pos < endOffset) {
int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
pos += Tag.TAG_LENGTH_SIZE;
- write(in, pos, tagLen, out);
+ Dictionary.write(out, in, pos, tagLen, tagDict);
pos += tagLen;
}
}
@@ -94,7 +94,7 @@ public class TagCompressionContext {
while (pos < endOffset) {
int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
pos += Tag.TAG_LENGTH_SIZE;
- write(in, pos, tagLen, out);
+ Dictionary.write(out, in, pos, tagLen, tagDict);;
pos += tagLen;
}
}
@@ -185,32 +185,4 @@ public class TagCompressionContext {
dest.put(tagBuf);
}
}
-
- private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
- short dictIdx = Dictionary.NOT_IN_DICTIONARY;
- if (tagDict != null) {
- dictIdx = tagDict.findEntry(data, offset, length);
- }
- if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
- out.write(Dictionary.NOT_IN_DICTIONARY);
- StreamUtils.writeRawVInt32(out, length);
- out.write(data, offset, length);
- } else {
- StreamUtils.writeShort(out, dictIdx);
- }
- }
-
- private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException {
- short dictIdx = Dictionary.NOT_IN_DICTIONARY;
- if (tagDict != null) {
- dictIdx = tagDict.findEntry(data, offset, length);
- }
- if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
- out.write(Dictionary.NOT_IN_DICTIONARY);
- StreamUtils.writeRawVInt32(out, length);
- ByteBufferUtils.copyBufferToStream(out, data, offset, length);
- } else {
- StreamUtils.writeShort(out, dictIdx);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
index 54677da..e6384e1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.io.util;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* Dictionary interface
@@ -80,4 +83,52 @@ public interface Dictionary {
* Flushes the dictionary, empties all values.
*/
void clear();
+
+ /**
+ * Helper methods to write the dictionary data to the OutputStream
+ * @param out the outputstream to which data needs to be written
+ * @param data the data to be written in byte[]
+ * @param offset the offset
+ * @param length length to be written
+ * @param dict the dictionary whose contents are to written
+ * @throws IOException
+ */
+ public static void write(OutputStream out, byte[] data, int offset, int length, Dictionary dict)
+ throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (dict != null) {
+ dictIdx = dict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ out.write(data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
+
+ /**
+ * Helper methods to write the dictionary data to the OutputStream
+ * @param out the outputstream to which data needs to be written
+ * @param data the data to be written in ByteBuffer
+ * @param offset the offset
+ * @param length length to be written
+ * @param dict the dictionary whose contents are to written
+ * @throws IOException
+ */
+ public static void write(OutputStream out, ByteBuffer data, int offset, int length,
+ Dictionary dict) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (dict != null) {
+ dictIdx = dict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ ByteBufferUtils.copyBufferToStream(out, data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
index 603496f..027ff11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
@@ -28,9 +28,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
import org.apache.hadoop.hbase.io.crypto.Decryptor;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
@@ -195,29 +197,32 @@ public class SecureWALCellCodec extends WALCellCodec {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream cout = encryptor.createEncryptionStream(baos);
-
+ ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout);
int tlen = cell.getTagsLength();
// Write the KeyValue infrastructure as VInts.
- StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell));
- StreamUtils.writeRawVInt32(cout, cell.getValueLength());
+ StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell));
+ StreamUtils.writeRawVInt32(bos, cell.getValueLength());
// To support tags
- StreamUtils.writeRawVInt32(cout, tlen);
+ StreamUtils.writeRawVInt32(bos, tlen);
// Write row, qualifier, and family
- StreamUtils.writeRawVInt32(cout, cell.getRowLength());
- cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- StreamUtils.writeRawVInt32(cout, cell.getFamilyLength());
- cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
- StreamUtils.writeRawVInt32(cout, cell.getQualifierLength());
- cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ short rowLength = cell.getRowLength();
+ StreamUtils.writeRawVInt32(bos, rowLength);
+ CellUtil.writeRow(bos, cell, rowLength);
+ byte familyLength = cell.getFamilyLength();
+ StreamUtils.writeRawVInt32(bos, familyLength);
+ CellUtil.writeFamily(bos, cell, familyLength);
+ int qualifierLength = cell.getQualifierLength();
+ StreamUtils.writeRawVInt32(bos, qualifierLength);
+ CellUtil.writeQualifier(bos, cell, qualifierLength);
// Write the rest ie. ts, type, value and tags parts
- StreamUtils.writeLong(cout, cell.getTimestamp());
- cout.write(cell.getTypeByte());
- cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ StreamUtils.writeLong(bos, cell.getTimestamp());
+ bos.write(cell.getTypeByte());
+ CellUtil.writeValue(bos, cell, cell.getValueLength());
if (tlen > 0) {
- cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
+ CellUtil.writeTags(bos, cell, tlen);
}
- cout.close();
+ bos.close();
StreamUtils.writeRawVInt32(out, baos.size());
baos.writeTo(out);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 1a18087..baa940b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -207,45 +208,24 @@ public class WALCellCodec implements Codec {
// To support tags
int tagsLength = cell.getTagsLength();
StreamUtils.writeRawVInt32(out, tagsLength);
-
- // Write row, qualifier, and family; use dictionary
- // compression as they're likely to have duplicates.
- write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
- write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- compression.familyDict);
- write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- compression.qualifierDict);
-
+ CellUtil.compressRow(out, cell, compression.rowDict);
+ CellUtil.compressFamily(out, cell, compression.familyDict);
+ CellUtil.compressQualifier(out, cell, compression.qualifierDict);
// Write timestamp, type and value as uncompressed.
StreamUtils.writeLong(out, cell.getTimestamp());
out.write(cell.getTypeByte());
- out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ CellUtil.writeValue(out, cell, cell.getValueLength());
if (tagsLength > 0) {
if (compression.tagCompressionContext != null) {
// Write tags using Dictionary compression
- compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
- cell.getTagsOffset(), tagsLength);
+ CellUtil.compressTags(out, cell, compression.tagCompressionContext);
} else {
// Tag compression is disabled within the WAL compression. Just write the tags bytes as
// it is.
- out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+ CellUtil.writeTags(out, cell, tagsLength);
}
}
}
-
- private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
- short dictIdx = Dictionary.NOT_IN_DICTIONARY;
- if (dict != null) {
- dictIdx = dict.findEntry(data, offset, length);
- }
- if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
- out.write(Dictionary.NOT_IN_DICTIONARY);
- StreamUtils.writeRawVInt32(out, length);
- out.write(data, offset, length);
- } else {
- StreamUtils.writeShort(out, dictIdx);
- }
- }
}
static class CompressedKvDecoder extends BaseDecoder {
@@ -364,9 +344,9 @@ public class WALCellCodec implements Codec {
@Override
public Encoder getEncoder(OutputStream os) {
+ os = (os instanceof ByteBufferWriter) ? os
+ : new ByteBufferWriterOutputStream(os);
if (compression == null) {
- os = (os instanceof ByteBufferWriter) ? os
- : new ByteBufferWriterOutputStream(os);
return new EnsureKvEncoder(os);
}
return new CompressedKvEncoder(os, compression);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
index 9f3bd94..6fd65f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
@@ -41,7 +41,6 @@ public class TestRpcServer {
@Test
public void testAllocateByteBuffToReadInto() throws Exception {
- System.out.println(Long.MAX_VALUE);
int maxBuffersInPool = 10;
ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
initPoolWithAllBuffers(pool, maxBuffersInPool);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index e834ac8..ba5bfa3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -46,24 +48,35 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeKVsWithTags() throws Exception {
- doTest(false);
+ doTest(false, false);
}
@Test
public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
- doTest(true);
+ doTest(true, false);
}
- private void doTest(boolean compressTags) throws Exception {
+ @Test
+ public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception {
+ doTest(true, true);
+ }
+
+ private void doTest(boolean compressTags, boolean offheapKV) throws Exception {
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
compressTags));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
- encoder.write(createKV(1));
- encoder.write(createKV(0));
- encoder.write(createKV(2));
+ if (offheapKV) {
+ encoder.write(createOffheapKV(1));
+ encoder.write(createOffheapKV(0));
+ encoder.write(createOffheapKV(2));
+ } else {
+ encoder.write(createKV(1));
+ encoder.write(createKV(0));
+ encoder.write(createKV(2));
+ }
InputStream is = new ByteArrayInputStream(bos.toByteArray());
Decoder decoder = codec.getDecoder(is);
@@ -95,4 +108,19 @@ public class TestWALCellCodecWithCompression {
}
return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
}
+
+ private OffheapKeyValue createOffheapKV(int noOfTags) {
+ byte[] row = Bytes.toBytes("myRow");
+ byte[] cf = Bytes.toBytes("myCF");
+ byte[] q = Bytes.toBytes("myQualifier");
+ byte[] value = Bytes.toBytes("myValue");
+ List<Tag> tags = new ArrayList<Tag>(noOfTags);
+ for (int i = 1; i <= noOfTags; i++) {
+ tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
+ }
+ KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
+ ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+ dbb.put(kv.getBuffer());
+ return new OffheapKeyValue(dbb, 0, kv.getBuffer().length);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c43a23c/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 3e060ab..0562fd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -90,7 +93,8 @@ public class TestWALReaderOnSecureWAL {
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
}
- private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
+ @SuppressWarnings("deprecation")
+ private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
@@ -116,7 +120,15 @@ public class TestWALReaderOnSecureWAL {
wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
- kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+ KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
+ if (offheap) {
+ ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+ bb.put(kv.getBuffer());
+ OffheapKeyValue offheapKV = new OffheapKeyValue(bb, 0, kv.getLength());
+ kvs.add(offheapKV);
+ } else {
+ kvs.add(kv);
+ }
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
@@ -132,7 +144,16 @@ public class TestWALReaderOnSecureWAL {
}
@Test()
- public void testWALReaderOnSecureWAL() throws Exception {
+ public void testWALReaderOnSecureWALWithKeyValues() throws Exception {
+ testSecureWALInternal(false);
+ }
+
+ @Test()
+ public void testWALReaderOnSecureWALWithOffheapKeyValues() throws Exception {
+ testSecureWALInternal(true);
+ }
+
+ private void testSecureWALInternal(boolean offheap) throws IOException, FileNotFoundException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
WAL.Reader.class);
@@ -143,7 +164,7 @@ public class TestWALReaderOnSecureWAL {
conf.setBoolean(WAL_ENCRYPTION, true);
FileSystem fs = TEST_UTIL.getTestFileSystem();
final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
- Path walPath = writeWAL(wals, currentTest.getMethodName());
+ Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap);
// Insure edits are not plaintext
long length = fs.getFileStatus(walPath).getLen();
@@ -188,7 +209,7 @@ public class TestWALReaderOnSecureWAL {
conf.setBoolean(WAL_ENCRYPTION, false);
FileSystem fs = TEST_UTIL.getTestFileSystem();
final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
- Path walPath = writeWAL(wals, currentTest.getMethodName());
+ Path walPath = writeWAL(wals, currentTest.getMethodName(), false);
// Ensure edits are plaintext
long length = fs.getFileStatus(walPath).getLen();