You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/05/22 08:31:20 UTC
[2/2] git commit: HBASE-10835 DBE encode path improvements.(Anoop)
HBASE-10835 DBE encode path improvements.(Anoop)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53513dcb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53513dcb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53513dcb
Branch: refs/heads/master
Commit: 53513dcb452e104bbfd71819054bf4d68808f731
Parents: cb1428d
Author: anoopsjohn <an...@gmail.com>
Authored: Thu May 22 11:59:52 2014 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Thu May 22 11:59:52 2014 +0530
----------------------------------------------------------------------
.../io/encoding/BufferedDataBlockEncoder.java | 117 +++++----
.../io/encoding/CopyKeyDataBlockEncoder.java | 33 ++-
.../hbase/io/encoding/DataBlockEncoder.java | 44 ++--
.../hbase/io/encoding/DiffKeyDeltaEncoder.java | 242 ++++++++-----------
.../hbase/io/encoding/EncodedDataBlock.java | 47 +++-
.../hadoop/hbase/io/encoding/EncodingState.java | 34 +++
.../hbase/io/encoding/FastDiffDeltaEncoder.java | 225 ++++++++---------
.../HFileBlockDefaultEncodingContext.java | 68 ++----
.../io/encoding/HFileBlockEncodingContext.java | 41 ++--
.../io/encoding/PrefixKeyDeltaEncoder.java | 59 ++---
.../hbase/codec/prefixtree/PrefixTreeCodec.java | 96 ++++----
.../codec/prefixtree/encode/EncoderFactory.java | 2 +-
.../prefixtree/encode/EncoderPoolImpl.java | 47 ++++
.../hadoop/hbase/io/hfile/HFileBlock.java | 78 +++---
.../hbase/io/hfile/HFileDataBlockEncoder.java | 43 +++-
.../io/hfile/HFileDataBlockEncoderImpl.java | 58 ++---
.../hadoop/hbase/io/hfile/HFileWriterV2.java | 83 +++----
.../hadoop/hbase/io/hfile/HFileWriterV3.java | 88 ++-----
.../hbase/io/hfile/NoOpDataBlockEncoder.java | 47 +++-
.../io/encoding/TestDataBlockEncoders.java | 175 +++++++-------
.../io/encoding/TestPrefixTreeEncoding.java | 78 +++---
.../encoding/TestSeekToBlockWithEncoders.java | 68 ++----
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 119 +++------
.../io/hfile/TestHFileBlockCompatibility.java | 126 +++++-----
.../io/hfile/TestHFileDataBlockEncoder.java | 74 +++---
25 files changed, 1019 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index d79aff2..af78c1c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -474,10 +474,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
abstract protected void decodeNext();
}
- protected final void afterEncodingKeyValue(ByteBuffer in,
- DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
+ /**
+ * @param kv
+ * @param out
+ * @param encodingCtx
+ * @return unencoded size added
+ * @throws IOException
+ */
+ protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
+ HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
+ int size = 0;
if (encodingCtx.getHFileContext().isIncludesTags()) {
- short tagsLength = in.getShort();
+ short tagsLength = kv.getTagsLength();
ByteBufferUtils.putCompressedInt(out, tagsLength);
// There are some tags to be written
if (tagsLength > 0) {
@@ -485,23 +493,23 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
// the tags using Dictionary compression in such a case
if (tagCompressionContext != null) {
- tagCompressionContext.compressTags(out, in, tagsLength);
+ tagCompressionContext
+ .compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
} else {
- ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+ out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
}
}
+ size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
// Copy memstore timestamp from the byte buffer to the output stream.
- long memstoreTS = -1;
- try {
- memstoreTS = ByteBufferUtils.readVLong(in);
- WritableUtils.writeVLong(out, memstoreTS);
- } catch (IOException ex) {
- throw new RuntimeException("Unable to copy memstore timestamp " +
- memstoreTS + " after encoding a key/value");
- }
+ long memstoreTS = kv.getMvccVersion();
+ WritableUtils.writeVLong(out, memstoreTS);
+ // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
+ // avoided.
+ size += WritableUtils.getVIntSize(memstoreTS);
}
+ return size;
}
protected final void afterDecodingKeyValue(DataInputStream source,
@@ -545,23 +553,30 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
return new HFileBlockDefaultDecodingContext(meta);
}
- /**
- * Compress KeyValues and write them to output buffer.
- * @param out Where to write compressed data.
- * @param in Source of KeyValue for compression.
- * @param encodingCtx use the Encoding ctx associated with the current block
- * @throws IOException If there is an error writing to output stream.
- */
- public abstract void internalEncodeKeyValues(DataOutputStream out,
- ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
-
protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
throws IOException;
+ /**
+ * Asserts that there is at least the given amount of unfilled space
+ * remaining in the given buffer.
+ * @param out typically, the buffer we are writing to
+ * @param length the required space in the buffer
+ * @throws EncoderBufferTooSmallException If there are no enough bytes.
+ */
+ protected static void ensureSpace(ByteBuffer out, int length)
+ throws EncoderBufferTooSmallException {
+ if (out.position() + length > out.limit()) {
+ throw new EncoderBufferTooSmallException(
+ "Buffer position=" + out.position() +
+ ", buffer limit=" + out.limit() +
+ ", length to be written=" + length);
+ }
+ }
+
@Override
- public void encodeKeyValues(ByteBuffer in,
- HFileBlockEncodingContext blkEncodingCtx) throws IOException {
+ public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
+ throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException (this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
@@ -570,8 +585,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
- encodingCtx.prepareEncoding();
- DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
+ encodingCtx.prepareEncoding(out);
if (encodingCtx.getHFileContext().isIncludesTags()
&& encodingCtx.getHFileContext().isCompressTags()) {
if (encodingCtx.getTagCompressionContext() != null) {
@@ -588,29 +602,40 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
}
- internalEncodeKeyValues(dataOut, in, encodingCtx);
+ ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
+ blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
+ }
+
+ private static class BufferedDataBlockEncodingState extends EncodingState {
+ int unencodedDataSizeWritten = 0;
+ }
+
+ @Override
+ public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException {
+ BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
+ .getEncodingState();
+ int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
+ state.unencodedDataSizeWritten += encodedKvSize;
+ return encodedKvSize;
+ }
+
+ public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
+ DataOutputStream out) throws IOException;
+
+ @Override
+ public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader) throws IOException {
+ BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
+ .getEncodingState();
+ // Write the unencodedDataSizeWritten (with header size)
+ Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
+ + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
+ );
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
-
- /**
- * Asserts that there is at least the given amount of unfilled space
- * remaining in the given buffer.
- * @param out typically, the buffer we are writing to
- * @param length the required space in the buffer
- * @throws EncoderBufferTooSmallException If there are no enough bytes.
- */
- protected static void ensureSpace(ByteBuffer out, int length)
- throws EncoderBufferTooSmallException {
- if (out.position() + length > out.limit()) {
- throw new EncoderBufferTooSmallException(
- "Buffer position=" + out.position() +
- ", buffer limit=" + out.limit() +
- ", length to be written=" + length);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 1dc8413..c1c9956 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
/**
* Just copy data, do not do any kind of compression. Use for comparison and
@@ -32,14 +34,33 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
+
@Override
- public void internalEncodeKeyValues(DataOutputStream out,
- ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- ByteBufferUtils.moveBufferToStream(out, in, in.limit());
- }
+ public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+ DataOutputStream out) throws IOException {
+ int klength = kv.getKeyLength();
+ int vlength = kv.getValueLength();
+ out.writeInt(klength);
+ out.writeInt(vlength);
+ out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
+ out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
+ int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+ // Write the additional tag into the stream
+ if (encodingContext.getHFileContext().isIncludesTags()) {
+ short tagsLength = kv.getTagsLength();
+ out.writeShort(tagsLength);
+ if (tagsLength > 0) {
+ out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
+ }
+ size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
+ }
+ if (encodingContext.getHFileContext().isIncludesMvcc()) {
+ WritableUtils.writeVLong(out, kv.getMvccVersion());
+ size += WritableUtils.getVIntSize(kv.getMvccVersion());
+ }
+ return size;
+ }
@Override
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index ddb2359..99f6a7f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -17,11 +17,13 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -34,28 +36,42 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
* <li>knowledge of Key Value format</li>
* </ul>
* It is designed to work fast enough to be feasible as in memory compression.
- *
- * After encoding, it also optionally compresses the encoded data if a
- * compression algorithm is specified in HFileBlockEncodingContext argument of
- * {@link #encodeKeyValues(ByteBuffer, HFileBlockEncodingContext)}.
*/
@InterfaceAudience.Private
public interface DataBlockEncoder {
/**
- * Encodes KeyValues. It will first encode key value pairs, and then
- * optionally do the compression for the encoded data.
- *
- * @param in
- * Source of KeyValue for compression.
+ * Starts encoding for a block of KeyValues. Call
+ * {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
+ * encoding of a block.
+ * @param encodingCtx
+ * @param out
+ * @throws IOException
+ */
+ void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException;
+
+ /**
+ * Encodes a KeyValue.
+ * @param kv
+ * @param encodingCtx
+ * @param out
+ * @return unencoded kv size written
+ * @throws IOException
+ */
+ int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException;
+
+ /**
+ * Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
+ * stuff for the encoded block. It must be called at the end of block encoding.
* @param encodingCtx
- * the encoding context which will contain encoded uncompressed bytes
- * as well as compressed encoded bytes if compression is enabled, and
- * also it will reuse resources across multiple calls.
+ * @param out
+ * @param uncompressedBytesWithHeader
* @throws IOException
- * If there is an error writing to output stream.
*/
- void encodeKeyValues(ByteBuffer in, HFileBlockEncodingContext encodingCtx) throws IOException;
+ void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader) throws IOException;
/**
* Decode.
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index f72878b..fc4c314 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -75,130 +75,6 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
}
- private void compressSingleKeyValue(DiffCompressionState previousState,
- DiffCompressionState currentState, DataOutputStream out,
- ByteBuffer in) throws IOException {
- byte flag = 0;
- int kvPos = in.position();
- int keyLength = in.getInt();
- int valueLength = in.getInt();
-
- long timestamp;
- long diffTimestamp = 0;
- int diffTimestampFitsInBytes = 0;
-
- int commonPrefix;
-
- int timestampFitsInBytes;
-
- if (previousState.isFirst()) {
- currentState.readKey(in, keyLength, valueLength);
- currentState.prevOffset = kvPos;
- timestamp = currentState.timestamp;
- if (timestamp < 0) {
- flag |= FLAG_TIMESTAMP_SIGN;
- timestamp = -timestamp;
- }
- timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
- flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- commonPrefix = 0;
-
- // put column family
- in.mark();
- ByteBufferUtils.skip(in, currentState.rowLength
- + KeyValue.ROW_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
- + KeyValue.FAMILY_LENGTH_SIZE);
- in.reset();
- } else {
- // find a common prefix and skip it
- commonPrefix =
- ByteBufferUtils.findCommonPrefix(in, in.position(),
- previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
- - KeyValue.TIMESTAMP_TYPE_SIZE);
- // don't compress timestamp and type using prefix
-
- currentState.readKey(in, keyLength, valueLength,
- commonPrefix, previousState);
- currentState.prevOffset = kvPos;
- timestamp = currentState.timestamp;
- boolean negativeTimestamp = timestamp < 0;
- if (negativeTimestamp) {
- timestamp = -timestamp;
- }
- timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
- if (keyLength == previousState.keyLength) {
- flag |= FLAG_SAME_KEY_LENGTH;
- }
- if (valueLength == previousState.valueLength) {
- flag |= FLAG_SAME_VALUE_LENGTH;
- }
- if (currentState.type == previousState.type) {
- flag |= FLAG_SAME_TYPE;
- }
-
- // encode timestamp
- diffTimestamp = previousState.timestamp - currentState.timestamp;
- boolean minusDiffTimestamp = diffTimestamp < 0;
- if (minusDiffTimestamp) {
- diffTimestamp = -diffTimestamp;
- }
- diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
- if (diffTimestampFitsInBytes < timestampFitsInBytes) {
- flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- flag |= FLAG_TIMESTAMP_IS_DIFF;
- if (minusDiffTimestamp) {
- flag |= FLAG_TIMESTAMP_SIGN;
- }
- } else {
- flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
- if (negativeTimestamp) {
- flag |= FLAG_TIMESTAMP_SIGN;
- }
- }
- }
-
- out.write(flag);
-
- if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, keyLength);
- }
- if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, valueLength);
- }
-
- ByteBufferUtils.putCompressedInt(out, commonPrefix);
- ByteBufferUtils.skip(in, commonPrefix);
-
- if (previousState.isFirst() ||
- commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
- int restRowLength =
- currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
- ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
- ByteBufferUtils.skip(in, currentState.familyLength +
- KeyValue.FAMILY_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
- } else {
- ByteBufferUtils.moveBufferToStream(out, in,
- keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
- }
-
- if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
- ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
- } else {
- ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
- }
-
- if ((flag & FLAG_SAME_TYPE) == 0) {
- out.write(currentState.type);
- }
- ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
-
- ByteBufferUtils.moveBufferToStream(out, in, valueLength);
- }
-
private void uncompressSingleKeyValue(DataInputStream source,
ByteBuffer buffer,
DiffCompressionState state)
@@ -316,24 +192,110 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
- public void internalEncodeKeyValues(DataOutputStream out,
- ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- DiffCompressionState previousState = new DiffCompressionState();
- DiffCompressionState currentState = new DiffCompressionState();
- while (in.hasRemaining()) {
- compressSingleKeyValue(previousState, currentState,
- out, in);
- afterEncodingKeyValue(in, out, encodingCtx);
-
- // swap previousState <-> currentState
- DiffCompressionState tmp = previousState;
- previousState = currentState;
- currentState = tmp;
- }
+ public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+ DataOutputStream out) throws IOException {
+ EncodingState state = encodingContext.getEncodingState();
+ int size = compressSingleKeyValue(out, kv, state.prevKv);
+ size += afterEncodingKeyValue(kv, out, encodingContext);
+ state.prevKv = kv;
+ return size;
}
+ private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
+ throws IOException {
+ byte flag = 0;
+ int kLength = kv.getKeyLength();
+ int vLength = kv.getValueLength();
+
+ long timestamp;
+ long diffTimestamp = 0;
+ int diffTimestampFitsInBytes = 0;
+ int timestampFitsInBytes;
+ int commonPrefix;
+ byte[] curKvBuf = kv.getBuffer();
+
+ if (prevKv == null) {
+ timestamp = kv.getTimestamp();
+ if (timestamp < 0) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ commonPrefix = 0;
+ // put column family
+ byte familyLength = kv.getFamilyLength();
+ out.write(familyLength);
+ out.write(kv.getFamilyArray(), kv.getFamilyOffset(), familyLength);
+ } else {
+ // Finding common prefix
+ int preKeyLength = prevKv.getKeyLength();
+ commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE, prevKv.getBuffer(), prevKv.getKeyOffset(), preKeyLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE);
+ if (kLength == preKeyLength) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (vLength == prevKv.getValueLength()) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (kv.getTypeByte() == prevKv.getTypeByte()) {
+ flag |= FLAG_SAME_TYPE;
+ }
+ // don't compress timestamp and type using prefix encode timestamp
+ timestamp = kv.getTimestamp();
+ diffTimestamp = prevKv.getTimestamp() - timestamp;
+ boolean negativeTimestamp = timestamp < 0;
+ if (negativeTimestamp) {
+ timestamp = -timestamp;
+ }
+ timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+ boolean minusDiffTimestamp = diffTimestamp < 0;
+ if (minusDiffTimestamp) {
+ diffTimestamp = -diffTimestamp;
+ }
+ diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+ if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+ flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ flag |= FLAG_TIMESTAMP_IS_DIFF;
+ if (minusDiffTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ } else {
+ flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+ if (negativeTimestamp) {
+ flag |= FLAG_TIMESTAMP_SIGN;
+ }
+ }
+ }
+ out.write(flag);
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, kLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, vLength);
+ }
+ ByteBufferUtils.putCompressedInt(out, commonPrefix);
+ if (prevKv == null || commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
+ int restRowLength = kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+ out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restRowLength);
+ out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
+ } else {
+ out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kLength - commonPrefix
+ - KeyValue.TIMESTAMP_TYPE_SIZE);
+ }
+ if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+ ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
+ } else {
+ ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
+ }
+
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ out.write(kv.getTypeByte());
+ }
+ out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
+ return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+ }
@Override
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
index 9e0497e..ce7356c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -36,13 +37,16 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.Compressor;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Encapsulates a data block compressed using a particular encoding algorithm.
* Useful for testing and benchmarking.
+ * This is used only in testing.
*/
@InterfaceAudience.Private
+@VisibleForTesting
public class EncodedDataBlock {
private byte[] rawKVs;
private ByteBuffer rawBuffer;
@@ -215,16 +219,53 @@ public class EncodedDataBlock {
* @return encoded data block with header and checksum
*/
public byte[] encodeData() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- this.dataBlockEncoder.encodeKeyValues(
- getUncompressedBuffer(), encodingCtx);
+ baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+ DataOutputStream out = new DataOutputStream(baos);
+ this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
+ ByteBuffer in = getUncompressedBuffer();
+ in.rewind();
+ int klength, vlength;
+ short tagsLength = 0;
+ long memstoreTS = 0L;
+ KeyValue kv = null;
+ while (in.hasRemaining()) {
+ int kvOffset = in.position();
+ klength = in.getInt();
+ vlength = in.getInt();
+ ByteBufferUtils.skip(in, klength + vlength);
+ if (this.meta.isIncludesTags()) {
+ tagsLength = in.getShort();
+ ByteBufferUtils.skip(in, tagsLength);
+ }
+ if (this.meta.isIncludesMvcc()) {
+ memstoreTS = ByteBufferUtils.readVLong(in);
+ }
+ kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
+ klength, vlength, tagsLength));
+ kv.setMvccVersion(memstoreTS);
+ this.dataBlockEncoder.encode(kv, encodingCtx, out);
+ }
+ BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
+ baos.writeTo(stream);
+ this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.buf);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Bug in encoding part of algorithm %s. " +
"Probably it requested more bytes than are available.",
toString()), e);
}
- return encodingCtx.getUncompressedBytesWithHeader();
+ return baos.toByteArray();
+ }
+
+ private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
+ private byte[] buf;
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ this.buf = b;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
new file mode 100644
index 0000000..b16f099
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Keeps track of the encoding state.
+ */
+@InterfaceAudience.Private
+public class EncodingState {
+
+ /**
+ * The previous KeyValue the encoder encoded.
+ */
+ protected KeyValue prevKv = null;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index 0346b20..4325f96 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -102,118 +101,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
}
- private void compressSingleKeyValue(
- FastDiffCompressionState previousState,
- FastDiffCompressionState currentState,
- OutputStream out, ByteBuffer in) throws IOException {
- currentState.prevOffset = in.position();
- int keyLength = in.getInt();
- int valueOffset =
- currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
- int valueLength = in.getInt();
- byte flag = 0;
-
- if (previousState.isFirst()) {
- // copy the key, there is no common prefix with none
- out.write(flag);
- ByteBufferUtils.putCompressedInt(out, keyLength);
- ByteBufferUtils.putCompressedInt(out, valueLength);
- ByteBufferUtils.putCompressedInt(out, 0);
-
- currentState.readKey(in, keyLength, valueLength);
-
- ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
- } else {
- // find a common prefix and skip it
- int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
- previousState.prevOffset + KeyValue.ROW_OFFSET,
- Math.min(keyLength, previousState.keyLength) -
- KeyValue.TIMESTAMP_TYPE_SIZE);
-
- currentState.readKey(in, keyLength, valueLength,
- commonPrefix, previousState);
-
- if (keyLength == previousState.keyLength) {
- flag |= FLAG_SAME_KEY_LENGTH;
- }
- if (valueLength == previousState.valueLength) {
- flag |= FLAG_SAME_VALUE_LENGTH;
- }
- if (currentState.type == previousState.type) {
- flag |= FLAG_SAME_TYPE;
- }
-
- int commonTimestampPrefix = findCommonTimestampPrefix(
- currentState, previousState);
- flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
-
- // Check if current and previous values are the same. Compare value
- // length first as an optimization.
- if (valueLength == previousState.valueLength) {
- int previousValueOffset = previousState.prevOffset
- + previousState.keyLength + KeyValue.ROW_OFFSET;
- if (ByteBufferUtils.arePartsEqual(in,
- previousValueOffset, previousState.valueLength,
- valueOffset, valueLength)) {
- flag |= FLAG_SAME_VALUE;
- }
- }
-
- out.write(flag);
- if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, keyLength);
- }
- if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- ByteBufferUtils.putCompressedInt(out, valueLength);
- }
- ByteBufferUtils.putCompressedInt(out, commonPrefix);
-
- ByteBufferUtils.skip(in, commonPrefix);
- if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
- // Previous and current rows are different. Copy the differing part of
- // the row, skip the column family, and copy the qualifier.
- ByteBufferUtils.moveBufferToStream(out, in,
- currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
- ByteBufferUtils.skip(in, currentState.familyLength +
- KeyValue.FAMILY_LENGTH_SIZE);
- ByteBufferUtils.moveBufferToStream(out, in,
- currentState.qualifierLength);
- } else {
- // The common part includes the whole row. As the column family is the
- // same across the whole file, it will automatically be included in the
- // common prefix, so we need not special-case it here.
- int restKeyLength = keyLength - commonPrefix -
- KeyValue.TIMESTAMP_TYPE_SIZE;
- ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
- }
- ByteBufferUtils.skip(in, commonTimestampPrefix);
- ByteBufferUtils.moveBufferToStream(out, in,
- KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
-
- // Write the type if it is not the same as before.
- if ((flag & FLAG_SAME_TYPE) == 0) {
- out.write(currentState.type);
- }
-
- // Write the value if it is not the same as before.
- if ((flag & FLAG_SAME_VALUE) == 0) {
- ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
- }
-
- // Skip key type and value in the input buffer.
- ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
+ private int findCommonTimestampPrefix(byte[] curKvBuf, int curKvTsOff, byte[] preKvBuf,
+ int preKvTsOff) {
+ int commonPrefix = 0;
+ while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
+ && curKvBuf[curKvTsOff + commonPrefix] == preKvBuf[preKvTsOff + commonPrefix]) {
+ commonPrefix++;
}
- }
-
- private int findCommonTimestampPrefix(FastDiffCompressionState left,
- FastDiffCompressionState right) {
- int prefixTimestamp = 0;
- while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
- left.timestamp[prefixTimestamp]
- == right.timestamp[prefixTimestamp]) {
- prefixTimestamp++;
- }
- return prefixTimestamp; // has to be at most 7 bytes
+ return commonPrefix; // has to be at most 7 bytes
}
private void uncompressSingleKeyValue(DataInputStream source,
@@ -342,22 +237,98 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
- public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in,
- HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(out, in.limit());
- FastDiffCompressionState previousState = new FastDiffCompressionState();
- FastDiffCompressionState currentState = new FastDiffCompressionState();
- while (in.hasRemaining()) {
- compressSingleKeyValue(previousState, currentState,
- out, in);
- afterEncodingKeyValue(in, out, encodingCtx);
-
- // swap previousState <-> currentState
- FastDiffCompressionState tmp = previousState;
- previousState = currentState;
- currentState = tmp;
+ public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+ DataOutputStream out) throws IOException {
+ EncodingState state = encodingContext.getEncodingState();
+ int size = compressSingleKeyValue(out, kv, state.prevKv);
+ size += afterEncodingKeyValue(kv, out, encodingContext);
+ state.prevKv = kv;
+ return size;
+ }
+
+ private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
+ throws IOException {
+ byte flag = 0;
+ int kLength = kv.getKeyLength();
+ int vLength = kv.getValueLength();
+ byte[] curKvBuf = kv.getBuffer();
+
+ if (prevKv == null) {
+ // copy the key, there is no common prefix with none
+ out.write(flag);
+ ByteBufferUtils.putCompressedInt(out, kLength);
+ ByteBufferUtils.putCompressedInt(out, vLength);
+ ByteBufferUtils.putCompressedInt(out, 0);
+ out.write(curKvBuf, kv.getKeyOffset(), kLength + vLength);
+ } else {
+ byte[] preKvBuf = prevKv.getBuffer();
+ int preKeyLength = prevKv.getKeyLength();
+ int preValLength = prevKv.getValueLength();
+ // find a common prefix and skip it
+ int commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset(), preKeyLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ if (kLength == prevKv.getKeyLength()) {
+ flag |= FLAG_SAME_KEY_LENGTH;
+ }
+ if (vLength == prevKv.getValueLength()) {
+ flag |= FLAG_SAME_VALUE_LENGTH;
+ }
+ if (kv.getTypeByte() == prevKv.getTypeByte()) {
+ flag |= FLAG_SAME_TYPE;
+ }
+
+ int commonTimestampPrefix = findCommonTimestampPrefix(curKvBuf, kv.getKeyOffset() + kLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset() + preKeyLength
+ - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+ flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
+
+ // Check if current and previous values are the same. Compare value
+ // length first as an optimization.
+ if (vLength == preValLength
+ && Bytes.equals(kv.getValueArray(), kv.getValueOffset(), vLength,
+ prevKv.getValueArray(), prevKv.getValueOffset(), preValLength)) {
+ flag |= FLAG_SAME_VALUE;
+ }
+
+ out.write(flag);
+ if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, kLength);
+ }
+ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+ ByteBufferUtils.putCompressedInt(out, vLength);
+ }
+ ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+ if (commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
+ // Previous and current rows are different. Copy the differing part of
+ // the row, skip the column family, and copy the qualifier.
+ out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kv.getRowLength()
+ + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+ out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
+ } else {
+ // The common part includes the whole row. As the column family is the
+ // same across the whole file, it will automatically be included in the
+ // common prefix, so we need not special-case it here.
+ int restKeyLength = kLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
+ out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restKeyLength);
+ }
+ out.write(curKvBuf, kv.getKeyOffset() + kLength - KeyValue.TIMESTAMP_TYPE_SIZE
+ + commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
+
+ // Write the type if it is not the same as before.
+ if ((flag & FLAG_SAME_TYPE) == 0) {
+ out.write(kv.getTypeByte());
+ }
+
+ // Write the value if it is not the same as before.
+ if ((flag & FLAG_SAME_VALUE) == 0) {
+ out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
+ }
}
+ return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
index 8386377..0dc1c8a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
@@ -49,13 +49,9 @@ import com.google.common.base.Preconditions;
public class HFileBlockDefaultEncodingContext implements
HFileBlockEncodingContext {
private byte[] onDiskBytesWithHeader;
- private byte[] uncompressedBytesWithHeader;
private BlockType blockType;
private final DataBlockEncoding encodingAlgo;
- private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
- private DataOutputStream dataOut = new DataOutputStream(encodedStream);
-
private byte[] dummyHeader;
// Compression state
@@ -77,6 +73,8 @@ public class HFileBlockDefaultEncodingContext implements
/** Initialization vector */
private byte[] iv;
+ private EncodingState encoderState;
+
/**
* @param encoding encoding used
* @param headerBytes dummy header bytes
@@ -113,52 +111,35 @@ public class HFileBlockDefaultEncodingContext implements
"Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
}
- @Override
- public void setDummyHeader(byte[] headerBytes) {
- dummyHeader = headerBytes;
- }
-
/**
* prepare to start a new encoding.
* @throws IOException
*/
- public void prepareEncoding() throws IOException {
- encodedStream.reset();
- dataOut.write(dummyHeader);
- if (encodingAlgo != null
- && encodingAlgo != DataBlockEncoding.NONE) {
- encodingAlgo.writeIdInBytes(dataOut);
+ public void prepareEncoding(DataOutputStream out) throws IOException {
+ if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
+ encodingAlgo.writeIdInBytes(out);
}
}
@Override
public void postEncoding(BlockType blockType)
throws IOException {
- dataOut.flush();
- compressAfterEncodingWithBlockType(encodedStream.toByteArray(), blockType);
this.blockType = blockType;
}
- /**
- * @param uncompressedBytesWithHeader
- * @param blockType
- * @throws IOException
- */
- public void compressAfterEncodingWithBlockType(byte[] uncompressedBytesWithHeader,
- BlockType blockType) throws IOException {
- compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
+ @Override
+ public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
+ compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
+ return onDiskBytesWithHeader;
}
/**
* @param uncompressedBytesWithHeader
- * @param blockType
* @param headerBytes
* @throws IOException
*/
- protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
- BlockType blockType, byte[] headerBytes) throws IOException {
- this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
-
+ protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
+ throws IOException {
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
@@ -238,20 +219,7 @@ public class HFileBlockDefaultEncodingContext implements
} else {
onDiskBytesWithHeader = uncompressedBytesWithHeader;
}
-
}
-
- this.blockType = blockType;
- }
-
- @Override
- public byte[] getOnDiskBytesWithHeader() {
- return onDiskBytesWithHeader;
- }
-
- @Override
- public byte[] getUncompressedBytesWithHeader() {
- return uncompressedBytesWithHeader;
}
@Override
@@ -271,10 +239,6 @@ public class HFileBlockDefaultEncodingContext implements
}
}
- public DataOutputStream getOutputStreamForEncoder() {
- return this.dataOut;
- }
-
@Override
public DataBlockEncoding getDataBlockEncoding() {
return this.encodingAlgo;
@@ -292,4 +256,14 @@ public class HFileBlockDefaultEncodingContext implements
public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
this.tagCompressionContext = tagCompressionContext;
}
+
+ @Override
+ public EncodingState getEncodingState() {
+ return this.encoderState;
+ }
+
+ @Override
+ public void setEncodingState(EncodingState state) {
+ this.encoderState = state;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
index 2fa9112..7649021 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.IOException;
-import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -34,33 +33,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
public interface HFileBlockEncodingContext {
/**
- * @return OutputStream to which encoded data is written
- */
- OutputStream getOutputStreamForEncoder();
-
- /**
- * @return encoded and compressed bytes with header which are ready to write
- * out to disk
- */
- byte[] getOnDiskBytesWithHeader();
-
- /**
- * @return encoded but not heavily compressed bytes with header which can be
- * cached in block cache
- */
- byte[] getUncompressedBytesWithHeader();
-
- /**
* @return the block type after encoding
*/
BlockType getBlockType();
/**
- * sets the dummy header bytes
- */
- void setDummyHeader(byte[] headerBytes);
-
- /**
* @return the {@link DataBlockEncoding} encoding used
*/
DataBlockEncoding getDataBlockEncoding();
@@ -83,4 +60,22 @@ public interface HFileBlockEncodingContext {
* @return HFile context information
*/
HFileContext getHFileContext();
+
+ /**
+ * Sets the encoding state.
+ * @param state
+ */
+ void setEncodingState(EncodingState state);
+
+ /**
+ * @return the encoding state
+ */
+ EncodingState getEncodingState();
+
+ /**
+ * @param uncompressedBytesWithHeader encoded bytes with header
+ * @return Bytes with header which are ready to write out to disk. This is compressed and
+ * encrypted bytes applying the set compression algorithm and encryption.
+ */
+ byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index f57ff4f..c699f6f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -44,50 +44,33 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
- private int addKV(int prevKeyOffset, DataOutputStream out,
- ByteBuffer in, int prevKeyLength) throws IOException {
- int keyLength = in.getInt();
- int valueLength = in.getInt();
-
- if (prevKeyOffset == -1) {
+ @Override
+ public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+ DataOutputStream out) throws IOException {
+ byte[] kvBuf = kv.getBuffer();
+ int klength = kv.getKeyLength();
+ int vlength = kv.getValueLength();
+ EncodingState state = encodingContext.getEncodingState();
+ if (state.prevKv == null) {
// copy the key, there is no common prefix with none
- ByteBufferUtils.putCompressedInt(out, keyLength);
- ByteBufferUtils.putCompressedInt(out, valueLength);
+ ByteBufferUtils.putCompressedInt(out, klength);
+ ByteBufferUtils.putCompressedInt(out, vlength);
ByteBufferUtils.putCompressedInt(out, 0);
- ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
+ out.write(kvBuf, kv.getKeyOffset(), klength + vlength);
} else {
// find a common prefix and skip it
- int common = ByteBufferUtils.findCommonPrefix(
- in, prevKeyOffset + KeyValue.ROW_OFFSET,
- in.position(),
- Math.min(prevKeyLength, keyLength));
-
- ByteBufferUtils.putCompressedInt(out, keyLength - common);
- ByteBufferUtils.putCompressedInt(out, valueLength);
+ int common = ByteBufferUtils.findCommonPrefix(state.prevKv.getBuffer(),
+ state.prevKv.getKeyOffset(), state.prevKv.getKeyLength(), kvBuf, kv.getKeyOffset(),
+ kv.getKeyLength());
+ ByteBufferUtils.putCompressedInt(out, klength - common);
+ ByteBufferUtils.putCompressedInt(out, vlength);
ByteBufferUtils.putCompressedInt(out, common);
-
- ByteBufferUtils.skip(in, common);
- ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
- + valueLength);
- }
-
- return keyLength;
- }
-
- @Override
- public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in,
- HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
- in.rewind();
- ByteBufferUtils.putInt(writeHere, in.limit());
- int prevOffset = -1;
- int offset = 0;
- int keyLength = 0;
- while (in.hasRemaining()) {
- offset = in.position();
- keyLength = addKV(prevOffset, writeHere, in, keyLength);
- afterEncodingKeyValue(in, writeHere, encodingCtx);
- prevOffset = offset;
+ out.write(kvBuf, kv.getKeyOffset() + common, klength - common + vlength);
}
+ int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+ size += afterEncodingKeyValue(kv, out, encodingContext);
+ state.prevKv = kv;
+ return size;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
index ef576eb..2a0c459 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.encode.PrefixTreeEncoder;
import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellSearcher;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.EncodingState;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.io.WritableUtils;
/**
* This class is created via reflection in DataBlockEncoding enum. Update the enum if class name or
@@ -63,50 +65,6 @@ public class PrefixTreeCodec implements DataBlockEncoder{
public PrefixTreeCodec() {
}
- /**
- * Copied from BufferedDataBlockEncoder. Almost definitely can be improved, but i'm not familiar
- * enough with the concept of the HFileBlockEncodingContext.
- */
- @Override
- public void encodeKeyValues(ByteBuffer in,
- HFileBlockEncodingContext blkEncodingCtx) throws IOException {
- if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
- throw new IOException(this.getClass().getName() + " only accepts "
- + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
- }
-
- HFileBlockDefaultEncodingContext encodingCtx
- = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
- encodingCtx.prepareEncoding();
- DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
- internalEncodeKeyValues(dataOut, in, encodingCtx.getHFileContext().isIncludesMvcc(),
- encodingCtx.getHFileContext().isIncludesTags());
-
- //do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
- if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
- encodingCtx.postEncoding(BlockType.ENCODED_DATA);
- } else {
- encodingCtx.postEncoding(BlockType.DATA);
- }
- }
-
- private void internalEncodeKeyValues(DataOutputStream encodedOutputStream,
- ByteBuffer rawKeyValues, boolean includesMvccVersion, boolean includesTag) throws IOException {
- rawKeyValues.rewind();
- PrefixTreeEncoder builder = EncoderFactory.checkOut(encodedOutputStream, includesMvccVersion);
-
- try {
- KeyValue kv;
- while ((kv = KeyValueUtil.nextShallowCopy(rawKeyValues, includesMvccVersion, includesTag)) != null) {
- builder.write(kv);
- }
- builder.flush();
- } finally {
- EncoderFactory.checkIn(builder);
- }
- }
-
-
@Override
public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx)
throws IOException {
@@ -202,4 +160,54 @@ public class PrefixTreeCodec implements DataBlockEncoder{
return new PrefixTreeSeeker(decodingCtx.getHFileContext().isIncludesMvcc());
}
+ @Override
+ public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException {
+ PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
+ PrefixTreeEncoder builder = state.builder;
+ builder.write(kv);
+ int size = kv.getLength();
+ if (encodingCtx.getHFileContext().isIncludesMvcc()) {
+ size += WritableUtils.getVIntSize(kv.getMvccVersion());
+ }
+ return size;
+ }
+
+ private static class PrefixTreeEncodingState extends EncodingState {
+ PrefixTreeEncoder builder = null;
+ }
+
+ @Override
+ public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
+ throws IOException {
+ if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
+ throw new IOException(this.getClass().getName() + " only accepts "
+ + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
+ }
+
+ HFileBlockDefaultEncodingContext encodingCtx =
+ (HFileBlockDefaultEncodingContext) blkEncodingCtx;
+ encodingCtx.prepareEncoding(out);
+
+ PrefixTreeEncoder builder = EncoderFactory.checkOut(out, encodingCtx.getHFileContext()
+ .isIncludesMvcc());
+ PrefixTreeEncodingState state = new PrefixTreeEncodingState();
+ state.builder = builder;
+ blkEncodingCtx.setEncodingState(state);
+ }
+
+ @Override
+ public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader) throws IOException {
+ PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
+ PrefixTreeEncoder builder = state.builder;
+ builder.flush();
+ EncoderFactory.checkIn(builder);
+ // do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
+ if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
+ encodingCtx.postEncoding(BlockType.ENCODED_DATA);
+ } else {
+ encodingCtx.postEncoding(BlockType.DATA);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
index ba5340d..e26f151 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class EncoderFactory {
- private static final EncoderPool POOL = new ThreadLocalEncoderPool();
+ private static final EncoderPool POOL = new EncoderPoolImpl();
public static PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
new file mode 100644
index 0000000..510404d
--- /dev/null
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec.prefixtree.encode;
+
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class EncoderPoolImpl implements EncoderPool {
+
+ private BlockingQueue<PrefixTreeEncoder> unusedEncoders =
+ new LinkedBlockingQueue<PrefixTreeEncoder>();
+
+ @Override
+ public PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {
+ PrefixTreeEncoder encoder = unusedEncoders.poll();
+ if (encoder == null) {
+ encoder = new PrefixTreeEncoder(outputStream, includeMvccVersion);
+ } else {
+ encoder.reset(outputStream, includeMvccVersion);
+ }
+ return encoder;
+ }
+
+ @Override
+ public void checkIn(PrefixTreeEncoder encoder) {
+ this.unusedEncoders.add(encoder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 89702c5..b39eec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -641,6 +642,10 @@ public class HFileBlock implements Cacheable {
*/
private DataOutputStream userDataStream;
+ // Size of actual data being written. Not considering the block encoding/compression. This
+ // includes the header size also.
+ private int unencodedDataSizeWritten;
+
/**
* Bytes to be written to the file system, including the header. Compressed
* if compression is turned on. It also includes the checksum data that
@@ -731,10 +736,25 @@ public class HFileBlock implements Cacheable {
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
+ if (newBlockType == BlockType.DATA) {
+ this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
+ }
+ this.unencodedDataSizeWritten = 0;
return userDataStream;
}
/**
+ * Writes the kv to this block
+ * @param kv
+ * @throws IOException
+ */
+ public void write(KeyValue kv) throws IOException{
+ expectState(State.WRITING);
+ this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx,
+ this.userDataStream);
+ }
+
+ /**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
* called in the "writing" state.
@@ -750,7 +770,7 @@ public class HFileBlock implements Cacheable {
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
- private void ensureBlockReady() throws IOException {
+ void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
@@ -768,6 +788,14 @@ public class HFileBlock implements Cacheable {
* write state to "block ready".
*/
private void finishBlock() throws IOException {
+ if (blockType == BlockType.DATA) {
+ BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
+ new BufferGrabbingByteArrayOutputStream();
+ baosInMemory.writeTo(baosInMemoryCopy);
+ this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
+ baosInMemoryCopy.buf, blockType);
+ blockType = dataBlockEncodingCtx.getBlockType();
+ }
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
@@ -777,15 +805,13 @@ public class HFileBlock implements Cacheable {
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
state = State.BLOCK_READY;
- if (blockType == BlockType.DATA) {
- encodeDataBlockForDisk();
+ if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
+ onDiskBytesWithHeader = dataBlockEncodingCtx
+ .compressAndEncrypt(uncompressedBytesWithHeader);
} else {
- defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
- uncompressedBytesWithHeader, blockType);
- onDiskBytesWithHeader =
- defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
+ onDiskBytesWithHeader = defaultBlockEncodingCtx
+ .compressAndEncrypt(uncompressedBytesWithHeader);
}
-
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBytesWithHeader.length,
fileContext.getBytesPerChecksum());
@@ -805,24 +831,17 @@ public class HFileBlock implements Cacheable {
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
}
- /**
- * Encodes this block if it is a data block and encoding is turned on in
- * {@link #dataBlockEncoder}.
- */
- private void encodeDataBlockForDisk() throws IOException {
- // do data block encoding, if data block encoder is set
- ByteBuffer rawKeyValues =
- ByteBuffer.wrap(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE,
- uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE).slice();
-
- // do the encoding
- dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
-
- uncompressedBytesWithHeader =
- dataBlockEncodingCtx.getUncompressedBytesWithHeader();
- onDiskBytesWithHeader =
- dataBlockEncodingCtx.getOnDiskBytesWithHeader();
- blockType = dataBlockEncodingCtx.getBlockType();
+ public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
+ private byte[] buf;
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ this.buf = b;
+ }
+
+ public byte[] getBuffer() {
+ return this.buf;
+ }
}
/**
@@ -873,7 +892,7 @@ public class HFileBlock implements Cacheable {
* @param out the output stream to write the
* @throws IOException
*/
- private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
+ protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady();
out.write(onDiskBytesWithHeader);
@@ -972,9 +991,8 @@ public class HFileBlock implements Cacheable {
* @return the number of bytes written
*/
public int blockSizeWritten() {
- if (state != State.WRITING)
- return 0;
- return userDataStream.size();
+ if (state != State.WRITING) return 0;
+ return this.unencodedDataSizeWritten;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
index 6493f07..7049e4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
@@ -16,10 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
@@ -36,18 +37,38 @@ public interface HFileDataBlockEncoder {
byte[] DATA_BLOCK_ENCODING = Bytes.toBytes("DATA_BLOCK_ENCODING");
/**
- * Should be called before an encoded or unencoded data block is written to
- * disk.
- * @param in KeyValues next to each other
- * @param encodingResult the encoded result
- * @param blockType block type
+ * Starts encoding for a block of KeyValues. Call
+ * {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[], BlockType)}
+ * to finish encoding of a block.
+ * @param encodingCtx
+ * @param out
* @throws IOException
*/
- void beforeWriteToDisk(
- ByteBuffer in,
- HFileBlockEncodingContext encodingResult,
- BlockType blockType
- ) throws IOException;
+ void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException;
+
+ /**
+ * Encodes a KeyValue.
+ * @param kv
+ * @param encodingCtx
+ * @param out
+ * @return unencoded kv size
+ * @throws IOException
+ */
+ int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException;
+
+ /**
+ * Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
+ * stuff for the encoded block. It must be called at the end of block encoding.
+ * @param encodingCtx
+ * @param out
+ * @param uncompressedBytesWithHeader
+ * @param blockType
+ * @throws IOException
+ */
+ void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException;
/**
* Decides whether we should use a scanner over encoded blocks.
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
index 4094450..edf4cc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
@@ -16,10 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@@ -89,24 +90,11 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
}
return encoding;
}
- /**
- * Precondition: a non-encoded buffer. Postcondition: on-disk encoding.
- *
- * The encoded results can be stored in {@link HFileBlockEncodingContext}.
- *
- * @throws IOException
- */
+
@Override
- public void beforeWriteToDisk(ByteBuffer in,
- HFileBlockEncodingContext encodeCtx,
- BlockType blockType) throws IOException {
- if (encoding == DataBlockEncoding.NONE) {
- // there is no need to encode the block before writing it to disk
- ((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncodingWithBlockType(
- in.array(), blockType);
- return;
- }
- encodeBufferToHFileBlockBuffer(in, encoding, encodeCtx);
+ public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException {
+ return this.encoding.getEncoder().encode(kv, encodingCtx, out);
}
@Override
@@ -114,26 +102,6 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
return encoding != DataBlockEncoding.NONE;
}
- /**
- * Encode a block of key value pairs.
- *
- * @param in input data to encode
- * @param algo encoding algorithm
- * @param encodeCtx where will the output data be stored
- */
- private void encodeBufferToHFileBlockBuffer(ByteBuffer in, DataBlockEncoding algo,
- HFileBlockEncodingContext encodeCtx) {
- DataBlockEncoder encoder = algo.getEncoder();
- try {
- encoder.encodeKeyValues(in, encodeCtx);
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Bug in data block encoder "
- + "'%s', it probably requested too much data, " +
- "exception message: %s.",
- algo.toString(), e.getMessage()), e);
- }
- }
@Override
public String toString() {
@@ -158,4 +126,18 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
}
return new HFileBlockDefaultDecodingContext(fileContext);
}
+
+ @Override
+ public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException {
+ if (this.encoding != null && this.encoding != DataBlockEncoding.NONE) {
+ this.encoding.getEncoder().startBlockEncoding(encodingCtx, out);
+ }
+ }
+
+ @Override
+ public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
+ this.encoding.getEncoder().endBlockEncoding(encodingCtx, out, uncompressedBytesWithHeader);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
index 2c14306..8c1c60e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
/**
* Writes HFile format version 2.
@@ -251,41 +250,12 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final KeyValue kv) throws IOException {
- append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
- kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
- this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
- }
-
- /**
- * Add key/value to file. Keys must be added in an order that agrees with the
- * Comparator passed on construction.
- *
- * @param key
- * Key to add. Cannot be empty nor null.
- * @param value
- * Value to add. Cannot be empty nor null.
- * @throws IOException
- */
- @Override
- public void append(final byte[] key, final byte[] value) throws IOException {
- append(0, key, 0, key.length, value, 0, value.length);
- }
-
- /**
- * Add key/value to file. Keys must be added in an order that agrees with the
- * Comparator passed on construction.
- *
- * @param key
- * @param koffset
- * @param klength
- * @param value
- * @param voffset
- * @param vlength
- * @throws IOException
- */
- protected void append(final long memstoreTS, final byte[] key, final int koffset,
- final int klength, final byte[] value, final int voffset, final int vlength)
- throws IOException {
+ byte[] key = kv.getBuffer();
+ int koffset = kv.getKeyOffset();
+ int klength = kv.getKeyLength();
+ byte[] value = kv.getValueArray();
+ int voffset = kv.getValueOffset();
+ int vlength = kv.getValueLength();
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
@@ -295,20 +265,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
if (!fsBlockWriter.isWriting())
newBlock();
- // Write length of key and value and then actual key and value bytes.
- // Additionally, we may also write down the memstoreTS.
- {
- DataOutputStream out = fsBlockWriter.getUserDataStream();
- out.writeInt(klength);
- totalKeyLength += klength;
- out.writeInt(vlength);
- totalValueLength += vlength;
- out.write(key, koffset, klength);
- out.write(value, voffset, vlength);
- if (this.hFileContext.isIncludesMvcc()) {
- WritableUtils.writeVLong(out, memstoreTS);
- }
- }
+ fsBlockWriter.write(kv);
+
+ totalKeyLength += klength;
+ totalValueLength += vlength;
// Are we the first key in this block?
if (firstKeyInBlock == null) {
@@ -321,6 +281,29 @@ public class HFileWriterV2 extends AbstractHFileWriter {
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
+ this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param key
+ * Key to add. Cannot be empty nor null.
+ * @param value
+ * Value to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ @Override
+ public void append(final byte[] key, final byte[] value) throws IOException {
+ int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, 0);
+ byte[] b = new byte[kvlen];
+ int pos = 0;
+ pos = Bytes.putInt(b, pos, key.length);
+ pos = Bytes.putInt(b, pos, value.length);
+ pos = Bytes.putBytes(b, pos, key, 0, key.length);
+ Bytes.putBytes(b, pos, value, 0, value.length);
+ append(new KeyValue(b, 0, kvlen));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
index d1edea3..8c8cb2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
/**
* {@link HFile} writer for version 3.
@@ -86,10 +84,11 @@ public class HFileWriterV3 extends HFileWriterV2 {
@Override
public void append(final KeyValue kv) throws IOException {
// Currently get the complete arrays
- append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
- kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), kv.getTagsArray(),
- kv.getTagsOffset(), kv.getTagsLength());
- this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
+ super.append(kv);
+ short tagsLength = kv.getTagsLength();
+ if (tagsLength > this.maxTagsLength) {
+ this.maxTagsLength = tagsLength;
+ }
}
/**
@@ -119,73 +118,20 @@ public class HFileWriterV3 extends HFileWriterV2 {
*/
@Override
public void append(final byte[] key, final byte[] value, byte[] tag) throws IOException {
- append(0, key, 0, key.length, value, 0, value.length, tag, 0, tag.length);
- }
-
- /**
- * Add key/value to file. Keys must be added in an order that agrees with the
- * Comparator passed on construction.
- * @param key
- * @param koffset
- * @param klength
- * @param value
- * @param voffset
- * @param vlength
- * @param tag
- * @param tagsOffset
- * @param tagLength
- * @throws IOException
- */
- private void append(final long memstoreTS, final byte[] key, final int koffset,
- final int klength, final byte[] value, final int voffset, final int vlength,
- final byte[] tag, final int tagsOffset, final int tagsLength) throws IOException {
- boolean dupKey = checkKey(key, koffset, klength);
- checkValue(value, voffset, vlength);
- if (!dupKey) {
- checkBlockBoundary();
+ int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, tag.length);
+ byte[] b = new byte[kvlen];
+ int pos = 0;
+ pos = Bytes.putInt(b, pos, key.length);
+ pos = Bytes.putInt(b, pos, value.length);
+ pos = Bytes.putBytes(b, pos, key, 0, key.length);
+ pos = Bytes.putBytes(b, pos, value, 0, value.length);
+ if (tag.length > 0) {
+ pos = Bytes.putShort(b, pos, (short) tag.length);
+ Bytes.putBytes(b, pos, tag, 0, tag.length);
}
-
- if (!fsBlockWriter.isWriting())
- newBlock();
-
- // Write length of key and value and then actual key and value bytes.
- // Additionally, we may also write down the memstoreTS.
- {
- DataOutputStream out = fsBlockWriter.getUserDataStream();
- out.writeInt(klength);
- totalKeyLength += klength;
- out.writeInt(vlength);
- totalValueLength += vlength;
- out.write(key, koffset, klength);
- out.write(value, voffset, vlength);
- // Write the additional tag into the stream
- if (hFileContext.isIncludesTags()) {
- out.writeShort((short) tagsLength);
- if (tagsLength > 0) {
- out.write(tag, tagsOffset, tagsLength);
- if (tagsLength > maxTagsLength) {
- maxTagsLength = tagsLength;
- }
- }
- }
- if (this.hFileContext.isIncludesMvcc()) {
- WritableUtils.writeVLong(out, memstoreTS);
- }
- }
-
- // Are we the first key in this block?
- if (firstKeyInBlock == null) {
- // Copy the key.
- firstKeyInBlock = new byte[klength];
- System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
- }
-
- lastKeyBuffer = key;
- lastKeyOffset = koffset;
- lastKeyLength = klength;
- entryCount++;
+ append(new KeyValue(b, 0, kvlen));
}
-
+
protected void finishFileInfo() throws IOException {
super.finishFileInfo();
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
index 3d34943..f5b61f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
@@ -16,15 +16,17 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.io.WritableUtils;
/**
* Does not perform any kind of encoding/decoding.
@@ -40,18 +42,30 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
}
@Override
- public void beforeWriteToDisk(ByteBuffer in,
- HFileBlockEncodingContext encodeCtx, BlockType blockType)
+ public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
- if (!(encodeCtx.getClass().getName().equals(
- HFileBlockDefaultEncodingContext.class.getName()))) {
- throw new IOException (this.getClass().getName() + " only accepts " +
- HFileBlockDefaultEncodingContext.class.getName() + ".");
- }
+ int klength = kv.getKeyLength();
+ int vlength = kv.getValueLength();
- HFileBlockDefaultEncodingContext defaultContext =
- (HFileBlockDefaultEncodingContext) encodeCtx;
- defaultContext.compressAfterEncodingWithBlockType(in.array(), blockType);
+ out.writeInt(klength);
+ out.writeInt(vlength);
+ out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
+ out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
+ int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+ // Write the additional tag into the stream
+ if (encodingCtx.getHFileContext().isIncludesTags()) {
+ short tagsLength = kv.getTagsLength();
+ out.writeShort(tagsLength);
+ if (tagsLength > 0) {
+ out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
+ }
+ encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
+ }
+ if (encodingCtx.getHFileContext().isIncludesMvcc()) {
+ WritableUtils.writeVLong(out, kv.getMvccVersion());
+ encodedKvSize += WritableUtils.getVIntSize(kv.getMvccVersion());
+ }
+ return encodedKvSize;
}
@Override
@@ -88,4 +102,15 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
return new HFileBlockDefaultDecodingContext(meta);
}
+
+ @Override
+ public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+ throws IOException {
+ }
+
+ @Override
+ public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+ byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
+ encodingCtx.postEncoding(BlockType.DATA);
+ }
}