You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/07/17 09:59:25 UTC
[4/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of
ByteBuffers (Ram)
HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/834f87b2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/834f87b2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/834f87b2
Branch: refs/heads/master
Commit: 834f87b23de533783ba5f5b858327a6164f17f55
Parents: a249989
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Jul 17 13:27:29 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Jul 17 13:27:29 2015 +0530
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/KeyValue.java | 10 +-
.../hadoop/hbase/io/ByteBuffInputStream.java | 100 ++
.../hadoop/hbase/io/ByteBufferInputStream.java | 101 --
.../io/encoding/BufferedDataBlockEncoder.java | 4 +-
.../io/encoding/CopyKeyDataBlockEncoder.java | 12 +-
.../hbase/io/encoding/DataBlockEncoder.java | 3 +-
.../hbase/io/encoding/DiffKeyDeltaEncoder.java | 15 +-
.../hbase/io/encoding/FastDiffDeltaEncoder.java | 18 +-
.../io/encoding/HFileBlockDecodingContext.java | 23 +-
.../HFileBlockDefaultDecodingContext.java | 8 +-
.../io/encoding/PrefixKeyDeltaEncoder.java | 18 +-
.../apache/hadoop/hbase/io/hfile/BlockType.java | 5 +-
.../org/apache/hadoop/hbase/nio/ByteBuff.java | 438 +++++++
.../apache/hadoop/hbase/nio/MultiByteBuff.java | 1100 ++++++++++++++++++
.../hadoop/hbase/nio/MultiByteBuffer.java | 1047 -----------------
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 312 +++++
.../hadoop/hbase/util/ByteBufferArray.java | 64 +
.../hadoop/hbase/util/ByteBufferUtils.java | 118 +-
.../java/org/apache/hadoop/hbase/util/Hash.java | 2 +
.../apache/hadoop/hbase/util/UnsafeAccess.java | 68 ++
.../hbase/io/TestByteBufferInputStream.java | 82 --
.../hbase/io/TestMultiByteBuffInputStream.java | 83 ++
.../hadoop/hbase/nio/TestMultiByteBuff.java | 324 ++++++
.../hadoop/hbase/nio/TestMultiByteBuffer.java | 316 -----
.../hbase/codec/prefixtree/PrefixTreeCodec.java | 10 +-
.../codec/prefixtree/PrefixTreeSeeker.java | 1 +
.../hbase/io/hfile/CacheableDeserializer.java | 6 +-
.../hbase/io/hfile/CompoundBloomFilter.java | 8 +-
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 4 +-
.../hadoop/hbase/io/hfile/HFileBlock.java | 106 +-
.../hadoop/hbase/io/hfile/HFileBlockIndex.java | 41 +-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 122 +-
.../hbase/io/hfile/MemcachedBlockCache.java | 5 +-
.../hbase/io/hfile/bucket/BucketCache.java | 11 +-
.../io/hfile/bucket/ByteBufferIOEngine.java | 14 +
.../hbase/io/hfile/bucket/FileIOEngine.java | 18 +
.../hadoop/hbase/io/hfile/bucket/IOEngine.java | 21 +
.../hbase/mapreduce/LoadIncrementalHFiles.java | 2 -
.../hadoop/hbase/regionserver/StoreFile.java | 4 +-
.../hadoop/hbase/regionserver/StoreScanner.java | 1 -
.../apache/hadoop/hbase/util/BloomFilter.java | 6 +-
.../hadoop/hbase/util/BloomFilterChunk.java | 31 -
.../hadoop/hbase/util/BloomFilterUtil.java | 16 +-
.../hbase/util/hbck/TableLockChecker.java | 1 -
.../hadoop/hbase/client/TestFromClientSide.java | 2 +
.../io/encoding/TestDataBlockEncoders.java | 4 +-
.../hadoop/hbase/io/hfile/CacheTestUtils.java | 13 +-
.../hadoop/hbase/io/hfile/TestCacheConfig.java | 6 +-
.../hadoop/hbase/io/hfile/TestChecksum.java | 6 +-
.../apache/hadoop/hbase/io/hfile/TestHFile.java | 11 +-
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 33 +-
.../io/hfile/TestHFileBlockCompatibility.java | 7 +-
.../hbase/io/hfile/TestHFileBlockIndex.java | 12 +-
.../hbase/io/hfile/TestHFileWriterV2.java | 6 +-
.../hbase/io/hfile/TestHFileWriterV3.java | 6 +-
.../io/hfile/bucket/TestByteBufferIOEngine.java | 45 +
.../hadoop/hbase/util/TestBloomFilterChunk.java | 68 +-
.../hadoop/hbase/util/TestByteBuffUtils.java | 78 ++
.../hadoop/hbase/util/TestByteBufferUtils.java | 2 +-
59 files changed, 3104 insertions(+), 1894 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8c73984..368bf41 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -2613,6 +2613,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
*/
public static class KeyOnlyKeyValue extends KeyValue {
+ private short rowLen = -1;
public KeyOnlyKeyValue() {
}
@@ -2624,6 +2625,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this.bytes = b;
this.length = length;
this.offset = offset;
+ this.rowLen = Bytes.toShort(this.bytes, this.offset);
}
@Override
@@ -2642,6 +2644,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this.bytes = key;
this.offset = offset;
this.length = length;
+ this.rowLen = Bytes.toShort(this.bytes, this.offset);
}
@Override
@@ -2699,7 +2702,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
@Override
public short getRowLength() {
- return Bytes.toShort(this.bytes, getKeyOffset());
+ return rowLen;
}
@Override
@@ -2769,5 +2772,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
public boolean equals(Object other) {
return super.equals(other);
}
+
+ @Override
+ public long heapSize() {
+ return super.heapSize() + Bytes.SIZEOF_SHORT;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
new file mode 100644
index 0000000..4f6b3c2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+
+/**
+ * Not thread safe!
+ * <p>
+ * Please note that the reads will cause position movement on wrapped ByteBuff.
+ */
+@InterfaceAudience.Private
+public class ByteBuffInputStream extends InputStream {
+
+ private ByteBuff buf;
+
+ public ByteBuffInputStream(ByteBuff buf) {
+ this.buf = buf;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is returned as an
+ * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
+ * because the end of the stream has been reached, the value <code>-1</code> is returned.
+ * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
+ */
+ public int read() {
+ if (this.buf.hasRemaining()) {
+ return (this.buf.get() & 0xff);
+ }
+ return -1;
+ }
+
+ /**
+ * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
+ * given offset).
+ * @param b the array into which the data is read.
+ * @param off the start offset in the destination array <code>b</code>
+ * @param len the maximum number of bytes to read.
+ * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
+ * 1 byte can be read because the end of the stream has been reached.
+ */
+ public int read (byte b[], int off, int len) {
+ int avail = available();
+ if (avail <= 0) {
+ return -1;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+
+ if (len > avail) {
+ len = avail;
+ }
+ this.buf.get(b, off, len);
+ return len;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
+ * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
+ * equal to the smaller of <code>n</code> and remaining bytes in the stream.
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ public long skip(long n) {
+ long k = Math.min(n, available());
+ if (k <= 0) {
+ return 0;
+ }
+ this.buf.skip((int) k);
+ return k;
+ }
+
+ /**
+ * @return the number of remaining bytes that can be read (or skipped
+ * over) from this input stream.
+ */
+ public int available() {
+ return this.buf.remaining();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
deleted file mode 100644
index 1530ccd..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Not thread safe!
- * <p>
- * Please note that the reads will cause position movement on wrapped ByteBuffer.
- */
-@InterfaceAudience.Private
-public class ByteBufferInputStream extends InputStream {
-
- private ByteBuffer buf;
-
- public ByteBufferInputStream(ByteBuffer buf) {
- this.buf = buf;
- }
-
- /**
- * Reads the next byte of data from this input stream. The value byte is returned as an
- * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
- * because the end of the stream has been reached, the value <code>-1</code> is returned.
- * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
- */
- public int read() {
- if (this.buf.hasRemaining()) {
- return (this.buf.get() & 0xff);
- }
- return -1;
- }
-
- /**
- * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
- * given offset).
- * @param b the array into which the data is read.
- * @param off the start offset in the destination array <code>b</code>
- * @param len the maximum number of bytes to read.
- * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
- * 1 byte can be read because the end of the stream has been reached.
- */
- public int read(byte b[], int off, int len) {
- int avail = available();
- if (avail <= 0) {
- return -1;
- }
-
- if (len > avail) {
- len = avail;
- }
- if (len <= 0) {
- return 0;
- }
-
- this.buf.get(b, off, len);
- return len;
- }
-
- /**
- * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
- * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
- * equal to the smaller of <code>n</code> and remaining bytes in the stream.
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
- */
- public long skip(long n) {
- long k = Math.min(n, available());
- if (k < 0) {
- k = 0;
- }
- this.buf.position((int) (this.buf.position() + k));
- return k;
- }
-
- /**
- * @return the number of remaining bytes that can be read (or skipped
- * over) from this input stream.
- */
- public int available() {
- return this.buf.remaining();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 03875dc..966c59b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -601,7 +601,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
kvBuffer.putInt(current.keyLength);
kvBuffer.putInt(current.valueLength);
kvBuffer.put(current.keyBuffer, 0, current.keyLength);
- ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
+ ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset,
current.valueLength);
if (current.tagsLength > 0) {
// Put short as unsigned
@@ -610,7 +610,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
if (current.tagsOffset != -1) {
// the offset of the tags bytes in the underlying buffer is marked. So the temp
// buffer,tagsBuffer was not been used.
- ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
+ ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset,
current.tagsLength);
} else {
// When tagsOffset is marked as -1, tag compression was present and so the tags were
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 4eea272..662be29 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
@@ -66,13 +67,12 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
}
@Override
- public Cell getFirstKeyCellInBlock(ByteBuffer block) {
- int keyLength = block.getInt(Bytes.SIZEOF_INT);
- ByteBuffer dup = block.duplicate();
+ public Cell getFirstKeyCellInBlock(ByteBuff block) {
+ int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
int pos = 3 * Bytes.SIZEOF_INT;
- dup.position(pos);
- dup.limit(pos + keyLength);
- return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+ ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
+ // TODO : to be changed here for BBCell
+ return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index b0467b8..ce71308 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
/**
* Encoding of KeyValue. It aims to be fast and efficient using assumptions:
@@ -90,7 +91,7 @@ public interface DataBlockEncoder {
* @param block encoded block we want index, the position will not change
* @return First key in block as a cell.
*/
- Cell getFirstKeyCellInBlock(ByteBuffer block);
+ Cell getFirstKeyCellInBlock(ByteBuff block);
/**
* Create a HFileBlock seeker which find KeyValues within a block.
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index f2d4751..90b8e6e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -305,15 +306,16 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
- public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+ public Cell getFirstKeyCellInBlock(ByteBuff block) {
block.mark();
block.position(Bytes.SIZEOF_INT);
byte familyLength = block.get();
- ByteBufferUtils.skip(block, familyLength);
+ block.skip(familyLength);
byte flag = block.get();
- int keyLength = ByteBufferUtils.readCompressedInt(block);
- ByteBufferUtils.readCompressedInt(block); // valueLength
- ByteBufferUtils.readCompressedInt(block); // commonLength
+ int keyLength = ByteBuff.readCompressedInt(block);
+ // TODO : See if we can avoid these reads as the read values are not getting used
+ ByteBuff.readCompressedInt(block); // valueLength
+ ByteBuff.readCompressedInt(block); // commonLength
ByteBuffer result = ByteBuffer.allocate(keyLength);
// copy row
@@ -341,7 +343,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
// copy the timestamp and type
int timestampFitInBytes =
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
- long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
+ long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestamp = -timestamp;
}
@@ -350,6 +352,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
block.reset();
+ // The result is already a BB. So always we will create a KeyOnlyKv.
return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index f750e09..fa4adbd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -354,18 +355,17 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
- public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+ public Cell getFirstKeyCellInBlock(ByteBuff block) {
block.mark();
block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
- int keyLength = ByteBufferUtils.readCompressedInt(block);
- ByteBufferUtils.readCompressedInt(block); // valueLength
- ByteBufferUtils.readCompressedInt(block); // commonLength
- int pos = block.position();
+ int keyLength = ByteBuff.readCompressedInt(block);
+ // TODO : See if we can avoid these reads as the read values are not getting used
+ ByteBuff.readCompressedInt(block); // valueLength
+ ByteBuff.readCompressedInt(block); // commonLength
+ ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
- ByteBuffer dup = block.duplicate();
- dup.position(pos);
- dup.limit(pos + keyLength);
- return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+ // TODO : Change to BBCell.
+ return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
index 37001cc..ffdb694 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
@@ -17,10 +17,10 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
/**
* A decoding context that is created by a reader's encoder, and is shared
@@ -32,22 +32,27 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
public interface HFileBlockDecodingContext {
/**
- * Perform all actions that need to be done before the encoder's real decoding process.
- * Decompression needs to be done if {@link HFileContext#getCompression()} returns a valid compression
+ * Perform all actions that need to be done before the encoder's real decoding
+ * process. Decompression needs to be done if
+ * {@link HFileContext#getCompression()} returns a valid compression
* algorithm.
*
- * @param onDiskSizeWithoutHeader numBytes after block and encoding headers
- * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
+ * @param onDiskSizeWithoutHeader
+ * numBytes after block and encoding headers
+ * @param uncompressedSizeWithoutHeader
+ * numBytes without header required to store the block after
* decompressing (not decoding)
- * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
- * @param onDiskBlock on disk data to be decoded
+ * @param blockBufferWithoutHeader
+ * ByteBuffer pointed after the header but before the data
+ * @param onDiskBlock
+ * on disk data to be decoded
* @throws IOException
*/
void prepareDecoding(
int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader,
- ByteBuffer blockBufferWithoutHeader,
- ByteBuffer onDiskBlock
+ ByteBuff blockBufferWithoutHeader,
+ ByteBuff onDiskBlock
) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index 78bb0d6..30382d9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -19,17 +19,17 @@ package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Decryptor;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -51,8 +51,8 @@ public class HFileBlockDefaultDecodingContext implements
@Override
public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
- ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException {
- InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock));
+ ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) throws IOException {
+ InputStream in = new DataInputStream(new ByteBuffInputStream(onDiskBlock));
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 15608cc..6e89de4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -172,22 +173,21 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
- public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+ public Cell getFirstKeyCellInBlock(ByteBuff block) {
block.mark();
block.position(Bytes.SIZEOF_INT);
- int keyLength = ByteBufferUtils.readCompressedInt(block);
- ByteBufferUtils.readCompressedInt(block);
- int commonLength = ByteBufferUtils.readCompressedInt(block);
+ int keyLength = ByteBuff.readCompressedInt(block);
+ // TODO : See if we can avoid these reads as the read values are not getting used
+ ByteBuff.readCompressedInt(block);
+ int commonLength = ByteBuff.readCompressedInt(block);
if (commonLength != 0) {
throw new AssertionError("Nonzero common length in the first key in "
+ "block: " + commonLength);
}
- int pos = block.position();
+ ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
- ByteBuffer dup = block.duplicate();
- dup.position(pos);
- dup.limit(pos + keyLength);
- return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+ // TODO : Change to BBCell
+ return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
index 0db584e..4228f57 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -131,7 +132,7 @@ public enum BlockType {
out.write(magic);
}
- public void write(ByteBuffer buf) {
+ public void write(ByteBuff buf) {
buf.put(magic);
}
@@ -161,7 +162,7 @@ public enum BlockType {
return parse(buf, 0, buf.length);
}
- public static BlockType read(ByteBuffer buf) throws IOException {
+ public static BlockType read(ByteBuff buf) throws IOException {
byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)];
buf.get(magicBuf);
BlockType blockType = parse(magicBuf, 0, magicBuf.length);
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
new file mode 100644
index 0000000..14e77a7
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An abstract class that abstracts out as to how the byte buffers are used,
+ * either single or multiple. We have this interface because the java's ByteBuffers
+ * cannot be sub-classed. This class provides APIs similar to the ones provided
+ * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
+ * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
+ * helps us in the read path.
+ */
+@InterfaceAudience.Private
+public abstract class ByteBuff {
+ /**
+ * @return this ByteBuff's current position
+ */
+ public abstract int position();
+
+ /**
+ * Sets this ByteBuff's position to the given value.
+ * @param position
+ * @return this object
+ */
+ public abstract ByteBuff position(int position);
+
+ /**
+ * Jumps the current position of this ByteBuff by specified length.
+ * @param len the length to be skipped
+ */
+ public abstract ByteBuff skip(int len);
+
+ /**
+ * Jumps back the current position of this ByteBuff by specified length.
+ * @param len the length to move back
+ */
+ public abstract ByteBuff moveBack(int len);
+
+ /**
+ * @return the total capacity of this ByteBuff.
+ */
+ public abstract int capacity();
+
+ /**
+ * Returns the limit of this ByteBuff
+ * @return limit of the ByteBuff
+ */
+ public abstract int limit();
+
+ /**
+ * Marks the limit of this ByteBuff.
+ * @param limit
+ * @return This ByteBuff
+ */
+ public abstract ByteBuff limit(int limit);
+
+ /**
+ * Rewinds this ByteBuff and the position is set to 0
+ * @return this object
+ */
+ public abstract ByteBuff rewind();
+
+ /**
+ * Marks the current position of the ByteBuff
+ * @return this object
+ */
+ public abstract ByteBuff mark();
+
+ /**
+ * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+ * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+ * as such will be returned. So users are warned not to change the position or limit of this
+ * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+ * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+ * the bytes to a newly created ByteBuffer of required size and return that.
+ *
+ * @param length number of bytes required.
+ * @return bytes from current position till length specified, as a single ByteButter.
+ */
+ public abstract ByteBuffer asSubByteBuffer(int length);
+
+ /**
+ * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+ * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+ * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+ * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+ * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+ * ByteBuffer of required size and return that.
+ *
+ * @param offset the offset in this ByteBuff from where the subBuffer should be created
+ * @param length the length of the subBuffer
+ * @param pair a pair that will have the bytes from the current position till length specified,
+ * as a single ByteBuffer and offset in that Buffer where the bytes starts.
+ * Since this API gets called in a loop we are passing a pair to it which could be created
+ * outside the loop and the method would set the values on the pair that is passed in by
+ * the caller. Thus it avoids more object creations that would happen if the pair that is
+ * returned is created by this method every time.
+ */
+ public abstract void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair);
+
+ /**
+ * Returns the number of elements between the current position and the
+ * limit.
+ * @return the remaining elements in this ByteBuff
+ */
+ public abstract int remaining();
+
+ /**
+ * Returns true if there are elements between the current position and the limt
+ * @return true if there are elements, false otherwise
+ */
+ public abstract boolean hasRemaining();
+
+ /**
+ * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff
+ * is reset back to last marked position.
+ * @return This ByteBuff
+ */
+ public abstract ByteBuff reset();
+
+ /**
+ * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark
+ * of the new ByteBuff will be independent than that of the original ByteBuff.
+ * The content of the new ByteBuff will start at this ByteBuff's current position
+ * @return a sliced ByteBuff
+ */
+ public abstract ByteBuff slice();
+
+ /**
+ * Returns an ByteBuff which is a duplicate version of this ByteBuff. The
+ * position, limit and mark of the new ByteBuff will be independent than that
+ * of the original ByteBuff. The content of the new ByteBuff will start at
+ * this ByteBuff's current position The position, limit and mark of the new
+ * ByteBuff would be identical to this ByteBuff in terms of values.
+ *
+ * @return a sliced ByteBuff
+ */
+ public abstract ByteBuff duplicate();
+
+ /**
+ * A relative method that returns byte at the current position. Increments the
+ * current position by the size of a byte.
+ * @return the byte at the current position
+ */
+ public abstract byte get();
+
+ /**
+ * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+ * @param index
+ * @return the byte at the given index
+ */
+ public abstract byte get(int index);
+
+ /**
+ * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers.
+ * The difference for this API from {@link #get(int)} the index specified should be after
+ * the current position. If not throws IndexOutOfBoundsException
+ * @param index
+ * @return the byte value at the given index.
+ */
+ public abstract byte getByteStrictlyForward(int index);
+
+ /**
+ * Writes a byte to this ByteBuff at the current position and increments the position
+ * @param b
+ * @return this object
+ */
+ public abstract ByteBuff put(byte b);
+
+ /**
+ * Writes a byte to this ByteBuff at the given index
+ * @param index
+ * @param b
+ * @return this object
+ */
+ public abstract ByteBuff put(int index, byte b);
+
+ /**
+ * Copies the specified number of bytes from this ByteBuff's current position to
+ * the byte[]'s offset. Also advances the position of the ByteBuff by the given length.
+ * @param dst
+ * @param offset within the current array
+ * @param length upto which the bytes to be copied
+ */
+ public abstract void get(byte[] dst, int offset, int length);
+
+ /**
+ * Copies the content from this ByteBuff's current position to the byte array and fills it. Also
+ * advances the position of the ByteBuff by the length of the byte[].
+ * @param dst
+ */
+ public abstract void get(byte[] dst);
+
+ /**
+ * Copies from the given byte[] to this ByteBuff
+ * @param src
+ * @param offset the position in the byte array from which the copy should be done
+ * @param length the length upto which the copy should happen
+ * @return this ByteBuff
+ */
+ public abstract ByteBuff put(byte[] src, int offset, int length);
+
+ /**
+ * Copies from the given byte[] to this ByteBuff
+ * @param src
+ * @return this ByteBuff
+ */
+ public abstract ByteBuff put(byte[] src);
+
+ /**
+ * @return true or false if the underlying BB support hasArray
+ */
+ public abstract boolean hasArray();
+
+ /**
+ * @return the byte[] if the underlying BB has single BB and hasArray true
+ */
+ public abstract byte[] array();
+
+ /**
+ * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff
+ */
+ public abstract int arrayOffset();
+
+ /**
+ * Returns the short value at the current position. Also advances the position by the size
+ * of short
+ *
+ * @return the short value at the current position
+ */
+ public abstract short getShort();
+
+ /**
+ * Fetches the short value at the given index. Does not change position of the
+ * underlying ByteBuffers. The caller is sure that the index will be after
+ * the current position of this ByteBuff. So even if the current short does not fit in the
+ * current item we can safely move to the next item and fetch the remaining bytes forming
+ * the short
+ *
+ * @param index
+ * @return the short value at the given index
+ */
+ public abstract short getShort(int index);
+
+ /**
+ * Fetches the short at the given index. Does not change position of the underlying ByteBuffers.
+ * The difference for this API from {@link #getShort(int)} the index specified should be
+ * after the current position. If not throws IndexOutOfBoundsException
+ * @param index
+ * @return the short value at the given index.
+ */
+ public abstract short getShortStrictlyForward(int index);
+
+ /**
+ * Returns the int value at the current position. Also advances the position by the size of int
+ *
+ * @return the int value at the current position
+ */
+ public abstract int getInt();
+
+ /**
+ * Writes an int to this ByteBuff at its current position. Also advances the position
+ * by size of int
+ * @param value Int value to write
+ * @return this object
+ */
+ public abstract ByteBuff putInt(int value);
+
+ /**
+ * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+ * Even if the current int does not fit in the
+ * current item we can safely move to the next item and fetch the remaining bytes forming
+ * the int
+ *
+ * @param index
+ * @return the int value at the given index
+ */
+ public abstract int getInt(int index);
+
+ /**
+ * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+ * The difference for this API from {@link #getInt(int)} the index specified should be after
+ * the current position. If not throws IndexOutOfBoundsException
+ * @param index
+ * @return the int value at the given index.
+ */
+ // TODO: any better name here?? getIntFromSubsequentPosition? or getIntAfterCurrentPosition?
+ // TODO : Make this relative wrt current position? Follow on JIRA
+ public abstract int getIntStrictlyForward(int index);
+ /**
+ * Returns the long value at the current position. Also advances the position by the size of long
+ *
+ * @return the long value at the current position
+ */
+ public abstract long getLong();
+
+ /**
+ * Writes a long to this ByteBuff at its current position.
+ * Also advances the position by size of long
+ * @param value Long value to write
+ * @return this object
+ */
+ public abstract ByteBuff putLong(long value);
+
+ /**
+ * Fetches the long at the given index. Does not change position of the
+ * underlying ByteBuffers. The caller is sure that the index will be after
+ * the current position of this ByteBuff. So even if the current long does not fit in the
+ * current item we can safely move to the next item and fetch the remaining bytes forming
+ * the long
+ *
+ * @param index
+ * @return the long value at the given index
+ */
+ public abstract long getLong(int index);
+
+ /**
+ * Fetches the long at the given index. Does not change position of the underlying ByteBuffers.
+ * The difference for this API from {@link #getLong(int)} the index specified should be after
+ * the current position. If not throws IndexOutOfBoundsException
+ * @param index
+ * @return the long value at the given index.
+ */
+ public abstract long getLongStrictlyForward(int index);
+
+ /**
+ * Copy the content from this ByteBuff to a byte[] based on the given offset and
+ * length
+ *
+ * @param offset
+ * the position from where the copy should start
+ * @param length
+ * the length upto which the copy has to be done
+ * @return byte[] with the copied contents from this ByteBuff.
+ */
+ public abstract byte[] toBytes(int offset, int length);
+
+ /**
+ * Copies the content from this ByteBuff to a ByteBuffer
+ * Note : This will advance the position marker of {@code out} but not change the position maker
+ * for this ByteBuff
+ * @param out the ByteBuffer to which the copy has to happen
+ * @param sourceOffset the offset in the ByteBuff from which the elements has
+ * to be copied
+ * @param length the length in this ByteBuff upto which the elements has to be copied
+ */
+ public abstract void get(ByteBuffer out, int sourceOffset, int length);
+
+ /**
+ * Copies the contents from the src ByteBuff to this ByteBuff. This will be
+ * absolute positional copying and
+ * won't affect the position of any of the buffers.
+ * @param offset the position in this ByteBuff to which the copy should happen
+ * @param src the src ByteBuff
+ * @param srcOffset the offset in the src ByteBuff from where the elements should be read
+ * @param length the length up to which the copy should happen
+ */
+ public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
+
+ // static helper methods
+ /**
+ * Read integer from ByteBuff coded in 7 bits and increment position.
+ * @return Read integer.
+ */
+ public static int readCompressedInt(ByteBuff buf) {
+ byte b = buf.get();
+ if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) {
+ return (b & ByteBufferUtils.VALUE_MASK)
+ + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT);
+ }
+ return b & ByteBufferUtils.VALUE_MASK;
+ }
+
+ /**
+ * Compares two ByteBuffs
+ *
+ * @param buf1 the first ByteBuff
+ * @param o1 the offset in the first ByteBuff from where the compare has to happen
+ * @param len1 the length in the first ByteBuff upto which the compare has to happen
+ * @param buf2 the second ByteBuff
+ * @param o2 the offset in the second ByteBuff from where the compare has to happen
+ * @param len2 the length in the second ByteBuff upto which the compare has to happen
+ * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
+ * smaller than buf2.
+ */
+ public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2,
+ int o2, int len2) {
+ if (buf1.hasArray() && buf2.hasArray()) {
+ return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
+ buf2.arrayOffset() + o2, len2);
+ }
+ int end1 = o1 + len1;
+ int end2 = o2 + len2;
+ for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+ int a = buf1.get(i) & 0xFF;
+ int b = buf2.get(j) & 0xFF;
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return len1 - len2;
+ }
+
+ /**
+ * Read long which was written to fitInBytes bytes and increment position.
+ * @param fitInBytes In how many bytes given long is stored.
+ * @return The value of parsed long.
+ */
+ public static long readLong(ByteBuff in, final int fitInBytes) {
+ long tmpLength = 0;
+ for (int i = 0; i < fitInBytes; ++i) {
+ tmpLength |= (in.get() & 0xffl) << (8l * i);
+ }
+ return tmpLength;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
new file mode 100644
index 0000000..984ade5
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -0,0 +1,1100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
+ * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
+ * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
+ * data is split across multiple byte buffers and we don't want copy them to single buffer
+ * for reading from it.
+ */
+@InterfaceAudience.Private
+public class MultiByteBuff extends ByteBuff {
+
+ private final ByteBuffer[] items;
+ // Pointer to the current item in the MBB
+ private ByteBuffer curItem = null;
+ // Index of the current item in the MBB
+ private int curItemIndex = 0;
+
+ private int limit = 0;
+ private int limitedItemIndex;
+ private int markedItemIndex = -1;
+ private final int[] itemBeginPos;
+
+ public MultiByteBuff(ByteBuffer... items) {
+ assert items != null;
+ assert items.length > 0;
+ this.items = items;
+ this.curItem = this.items[this.curItemIndex];
+ // See below optimization in getInt(int) where we check whether the given index land in current
+ // item. For this we need to check whether the passed index is less than the next item begin
+ // offset. To handle this effectively for the last item buffer, we add an extra item into this
+ // array.
+ itemBeginPos = new int[items.length + 1];
+ int offset = 0;
+ for (int i = 0; i < items.length; i++) {
+ ByteBuffer item = items[i];
+ item.rewind();
+ itemBeginPos[i] = offset;
+ int l = item.limit() - item.position();
+ offset += l;
+ }
+ this.limit = offset;
+ this.itemBeginPos[items.length] = offset + 1;
+ this.limitedItemIndex = this.items.length - 1;
+ }
+
+ private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
+ int curItemIndex, int markedIndex) {
+ this.items = items;
+ this.curItemIndex = curItemIndex;
+ this.curItem = this.items[this.curItemIndex];
+ this.itemBeginPos = itemBeginPos;
+ this.limit = limit;
+ this.limitedItemIndex = limitedIndex;
+ this.markedItemIndex = markedIndex;
+ }
+
+ /**
+ * @throws UnsupportedOperationException MBB does not support
+ * array based operations
+ */
+ @Override
+ public byte[] array() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @throws UnsupportedOperationException MBB does not
+ * support array based operations
+ */
+ @Override
+ public int arrayOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return false. MBB does not support array based operations
+ */
+ @Override
+ public boolean hasArray() {
+ return false;
+ }
+
+ /**
+ * @return the total capacity of this MultiByteBuffer.
+ */
+ @Override
+ public int capacity() {
+ int c = 0;
+ for (ByteBuffer item : this.items) {
+ c += item.capacity();
+ }
+ return c;
+ }
+
+ /**
+ * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+ * @param index
+ * @return the byte at the given index
+ */
+ @Override
+ public byte get(int index) {
+ int itemIndex = getItemIndex(index);
+ return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+ }
+
+ @Override
+ public byte getByteStrictlyForward(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+ throw new IndexOutOfBoundsException("The index " + index
+ + " should not be less than current position " + this.position());
+ }
+ int itemIndex = getItemIndexFromCurItemIndex(index);
+ return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+ }
+
+ /*
+ * Returns in which sub ByteBuffer, the given element index will be available.
+ */
+ private int getItemIndex(int elemIndex) {
+ int index = 1;
+ while (elemIndex >= this.itemBeginPos[index]) {
+ index++;
+ if (index == this.itemBeginPos.length) {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+ return index - 1;
+ }
+
+ /*
+ * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
+ * sure that the item will be after MBB's current position
+ */
+ private int getItemIndexFromCurItemIndex(int elemIndex) {
+ int index = this.curItemIndex;
+ while (elemIndex >= this.itemBeginPos[index]) {
+ index++;
+ if (index == this.itemBeginPos.length) {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+ return index - 1;
+ }
+
+ /**
+ * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
+ * @param index
+ * @return the int value at the given index
+ */
+ public int getInt(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex] <= index
+ && this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndex(index);
+ }
+ return getInt(index, itemIndex);
+ }
+
+ @Override
+ public int getIntStrictlyForward(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+ throw new IndexOutOfBoundsException("The index " + index
+ + " should not be less than current position " + this.position());
+ }
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndexFromCurItemIndex(index);
+ }
+ return getInt(index, itemIndex);
+ }
+
+ /**
+ * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
+ * @param index
+ * @return the short value at the given index
+ */
+ public short getShort(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex] <= index
+ && this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndex(index);
+ }
+ ByteBuffer item = items[itemIndex];
+ int offsetInItem = index - this.itemBeginPos[itemIndex];
+ if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
+ return ByteBufferUtils.toShort(item, offsetInItem);
+ }
+ if (items.length - 1 == itemIndex) {
+ // means cur item is the last one and we wont be able to read a int. Throw exception
+ throw new BufferUnderflowException();
+ }
+ ByteBuffer nextItem = items[itemIndex + 1];
+ // Get available one byte from this item and remaining one from next
+ short n = 0;
+ n ^= item.get(offsetInItem) & 0xFF;
+ n <<= 8;
+ n ^= nextItem.get(0) & 0xFF;
+ return n;
+ }
+
+ @Override
+ public short getShortStrictlyForward(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+ throw new IndexOutOfBoundsException("The index " + index
+ + " should not be less than current position " + this.position());
+ }
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndexFromCurItemIndex(index);
+ }
+ return getShort(index, itemIndex);
+ }
+
+ private int getInt(int index, int itemIndex) {
+ ByteBuffer item = items[itemIndex];
+ int offsetInItem = index - this.itemBeginPos[itemIndex];
+ int remainingLen = item.limit() - offsetInItem;
+ if (remainingLen >= Bytes.SIZEOF_INT) {
+ return ByteBufferUtils.toInt(item, offsetInItem);
+ }
+ if (items.length - 1 == itemIndex) {
+ // means cur item is the last one and we wont be able to read a int. Throw exception
+ throw new BufferUnderflowException();
+ }
+ ByteBuffer nextItem = items[itemIndex + 1];
+ // Get available bytes from this item and remaining from next
+ int l = 0;
+ for (int i = offsetInItem; i < item.capacity(); i++) {
+ l <<= 8;
+ l ^= item.get(i) & 0xFF;
+ }
+ for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) {
+ l <<= 8;
+ l ^= nextItem.get(i) & 0xFF;
+ }
+ return l;
+ }
+
+ private short getShort(int index, int itemIndex) {
+ ByteBuffer item = items[itemIndex];
+ int offsetInItem = index - this.itemBeginPos[itemIndex];
+ int remainingLen = item.limit() - offsetInItem;
+ if (remainingLen >= Bytes.SIZEOF_SHORT) {
+ return ByteBufferUtils.toShort(item, offsetInItem);
+ }
+ if (items.length - 1 == itemIndex) {
+ // means cur item is the last one and we wont be able to read a int. Throw exception
+ throw new BufferUnderflowException();
+ }
+ ByteBuffer nextItem = items[itemIndex + 1];
+ // Get available bytes from this item and remaining from next
+ short l = 0;
+ for (int i = offsetInItem; i < item.capacity(); i++) {
+ l <<= 8;
+ l ^= item.get(i) & 0xFF;
+ }
+ for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
+ l <<= 8;
+ l ^= nextItem.get(i) & 0xFF;
+ }
+ return l;
+ }
+
+ private long getLong(int index, int itemIndex) {
+ ByteBuffer item = items[itemIndex];
+ int offsetInItem = index - this.itemBeginPos[itemIndex];
+ int remainingLen = item.limit() - offsetInItem;
+ if (remainingLen >= Bytes.SIZEOF_LONG) {
+ return ByteBufferUtils.toLong(item, offsetInItem);
+ }
+ if (items.length - 1 == itemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ ByteBuffer nextItem = items[itemIndex + 1];
+ // Get available bytes from this item and remaining from next
+ long l = 0;
+ for (int i = offsetInItem; i < item.capacity(); i++) {
+ l <<= 8;
+ l ^= item.get(i) & 0xFF;
+ }
+ for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+ l <<= 8;
+ l ^= nextItem.get(i) & 0xFF;
+ }
+ return l;
+ }
+
+ /**
+ * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
+ * @param index
+ * @return the long value at the given index
+ */
+ public long getLong(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex] <= index
+ && this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndex(index);
+ }
+ ByteBuffer item = items[itemIndex];
+ int offsetInItem = index - this.itemBeginPos[itemIndex];
+ int remainingLen = item.limit() - offsetInItem;
+ if (remainingLen >= Bytes.SIZEOF_LONG) {
+ return ByteBufferUtils.toLong(item, offsetInItem);
+ }
+ if (items.length - 1 == itemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ ByteBuffer nextItem = items[itemIndex + 1];
+ // Get available bytes from this item and remaining from next
+ long l = 0;
+ for (int i = offsetInItem; i < item.capacity(); i++) {
+ l <<= 8;
+ l ^= item.get(i) & 0xFF;
+ }
+ for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+ l <<= 8;
+ l ^= nextItem.get(i) & 0xFF;
+ }
+ return l;
+ }
+
+ @Override
+ public long getLongStrictlyForward(int index) {
+ // Mostly the index specified will land within this current item. Short circuit for that
+ if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+ throw new IndexOutOfBoundsException("The index " + index
+ + " should not be less than current position " + this.position());
+ }
+ int itemIndex;
+ if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+ itemIndex = this.curItemIndex;
+ } else {
+ itemIndex = getItemIndexFromCurItemIndex(index);
+ }
+ return getLong(index, itemIndex);
+ }
+
+ /**
+ * @return this MBB's current position
+ */
+ @Override
+ public int position() {
+ return itemBeginPos[this.curItemIndex] + this.curItem.position();
+ }
+
+ /**
+ * Sets this MBB's position to the given value.
+ * @param position
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff position(int position) {
+ // Short circuit for positioning within the cur item. Mostly that is the case.
+ if (this.itemBeginPos[this.curItemIndex] <= position
+ && this.itemBeginPos[this.curItemIndex + 1] > position) {
+ this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
+ return this;
+ }
+ int itemIndex = getItemIndex(position);
+ // All items from 0 - curItem-1 set position at end.
+ for (int i = 0; i < itemIndex; i++) {
+ this.items[i].position(this.items[i].limit());
+ }
+ // All items after curItem set position at begin
+ for (int i = itemIndex + 1; i < this.items.length; i++) {
+ this.items[i].position(0);
+ }
+ this.curItem = this.items[itemIndex];
+ this.curItem.position(position - this.itemBeginPos[itemIndex]);
+ this.curItemIndex = itemIndex;
+ return this;
+ }
+
+ /**
+ * Rewinds this MBB and the position is set to 0
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff rewind() {
+ for (int i = 0; i < this.items.length; i++) {
+ this.items[i].rewind();
+ }
+ this.curItemIndex = 0;
+ this.curItem = this.items[this.curItemIndex];
+ this.markedItemIndex = -1;
+ return this;
+ }
+
+ /**
+ * Marks the current position of the MBB
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff mark() {
+ this.markedItemIndex = this.curItemIndex;
+ this.curItem.mark();
+ return this;
+ }
+
+ /**
+ * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
+ * is reset back to last marked position.
+ * @return This MBB
+ */
+ @Override
+ public MultiByteBuff reset() {
+ // when the buffer is moved to the next one.. the reset should happen on the previous marked
+ // item and the new one should be taken as the base
+ if (this.markedItemIndex < 0) throw new InvalidMarkException();
+ ByteBuffer markedItem = this.items[this.markedItemIndex];
+ markedItem.reset();
+ this.curItem = markedItem;
+ // All items after the marked position upto the current item should be reset to 0
+ for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
+ this.items[i].position(0);
+ }
+ this.curItemIndex = this.markedItemIndex;
+ return this;
+ }
+
+ /**
+ * Returns the number of elements between the current position and the
+ * limit.
+ * @return the remaining elements in this MBB
+ */
+ @Override
+ public int remaining() {
+ int remain = 0;
+ for (int i = curItemIndex; i < items.length; i++) {
+ remain += items[i].remaining();
+ }
+ return remain;
+ }
+
+ /**
+ * Returns true if there are elements between the current position and the limt
+ * @return true if there are elements, false otherwise
+ */
+ @Override
+ public final boolean hasRemaining() {
+ return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1;
+ }
+
+ /**
+ * A relative method that returns byte at the current position. Increments the
+ * current position by the size of a byte.
+ * @return the byte at the current position
+ */
+ @Override
+ public byte get() {
+ if (this.curItem.remaining() == 0) {
+ if (items.length - 1 == this.curItemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ }
+ return this.curItem.get();
+ }
+
+ /**
+ * Returns the short value at the current position. Also advances the position by the size
+ * of short
+ *
+ * @return the short value at the current position
+ */
+ @Override
+ public short getShort() {
+ int remaining = this.curItem.remaining();
+ if (remaining >= Bytes.SIZEOF_SHORT) {
+ return this.curItem.getShort();
+ }
+ if (remaining == 0) {
+ if (items.length - 1 == this.curItemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ return this.curItem.getShort();
+ }
+ short n = 0;
+ n ^= get() & 0xFF;
+ n <<= 8;
+ n ^= get() & 0xFF;
+ return n;
+ }
+
+ /**
+ * Returns the int value at the current position. Also advances the position by the size of int
+ *
+ * @return the int value at the current position
+ */
+ @Override
+ public int getInt() {
+ int remaining = this.curItem.remaining();
+ if (remaining >= Bytes.SIZEOF_INT) {
+ return this.curItem.getInt();
+ }
+ if (remaining == 0) {
+ if (items.length - 1 == this.curItemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ return this.curItem.getInt();
+ }
+ // Get available bytes from this item and remaining from next
+ int n = 0;
+ for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
+ n <<= 8;
+ n ^= get() & 0xFF;
+ }
+ return n;
+ }
+
+
+ /**
+ * Returns the long value at the current position. Also advances the position by the size of long
+ *
+ * @return the long value at the current position
+ */
+ @Override
+ public long getLong() {
+ int remaining = this.curItem.remaining();
+ if (remaining >= Bytes.SIZEOF_LONG) {
+ return this.curItem.getLong();
+ }
+ if (remaining == 0) {
+ if (items.length - 1 == this.curItemIndex) {
+ // means cur item is the last one and we wont be able to read a long. Throw exception
+ throw new BufferUnderflowException();
+ }
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ return this.curItem.getLong();
+ }
+ // Get available bytes from this item and remaining from next
+ long l = 0;
+ for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
+ l <<= 8;
+ l ^= get() & 0xFF;
+ }
+ return l;
+ }
+
+ /**
+ * Copies the content from this MBB's current position to the byte array and fills it. Also
+ * advances the position of the MBB by the length of the byte[].
+ * @param dst
+ */
+ @Override
+ public void get(byte[] dst) {
+ get(dst, 0, dst.length);
+ }
+
+ /**
+ * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
+ * Also advances the position of the MBB by the given length.
+ * @param dst
+ * @param offset within the current array
+ * @param length upto which the bytes to be copied
+ */
+ @Override
+ public void get(byte[] dst, int offset, int length) {
+ while (length > 0) {
+ int toRead = Math.min(length, this.curItem.remaining());
+ ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
+ toRead);
+ this.curItem.position(this.curItem.position() + toRead);
+ length -= toRead;
+ if (length == 0)
+ break;
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ offset += toRead;
+ }
+ }
+
+ /**
+ * Marks the limit of this MBB.
+ * @param limit
+ * @return This MBB
+ */
+ @Override
+ public MultiByteBuff limit(int limit) {
+ this.limit = limit;
+ // Normally the limit will try to limit within the last BB item
+ int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
+ if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
+ this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
+ return this;
+ }
+ int itemIndex = getItemIndex(limit);
+ int beginOffset = this.itemBeginPos[itemIndex];
+ int offsetInItem = limit - beginOffset;
+ ByteBuffer item = items[itemIndex];
+ item.limit(offsetInItem);
+ for (int i = this.limitedItemIndex; i < itemIndex; i++) {
+ this.items[i].limit(this.items[i].capacity());
+ }
+ this.limitedItemIndex = itemIndex;
+ for (int i = itemIndex + 1; i < this.items.length; i++) {
+ this.items[i].limit(this.items[i].position());
+ }
+ return this;
+ }
+
+ /**
+ * Returns the limit of this MBB
+ * @return limit of the MBB
+ */
+ @Override
+ public int limit() {
+ return this.limit;
+ }
+
+ /**
+ * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
+ * of the new MBB will be independent than that of the original MBB.
+ * The content of the new MBB will start at this MBB's current position
+ * @return a sliced MBB
+ */
+ @Override
+ public MultiByteBuff slice() {
+ ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
+ for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
+ copy[j] = this.items[i].slice();
+ }
+ return new MultiByteBuff(copy);
+ }
+
+ /**
+ * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
+ * of the new MBB will be independent than that of the original MBB.
+ * The content of the new MBB will start at this MBB's current position
+ * The position, limit and mark of the new MBB would be identical to this MBB in terms of
+ * values.
+ * @return a sliced MBB
+ */
+ @Override
+ public MultiByteBuff duplicate() {
+ ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
+ for (int i = 0; i < this.items.length; i++) {
+ itemsCopy[i] = items[i].duplicate();
+ }
+ return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
+ this.curItemIndex, this.markedItemIndex);
+ }
+
+ /**
+ * Writes a byte to this MBB at the current position and increments the position
+ * @param b
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff put(byte b) {
+ if (this.curItem.remaining() == 0) {
+ if (this.curItemIndex == this.items.length - 1) {
+ throw new BufferOverflowException();
+ }
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ }
+ this.curItem.put(b);
+ return this;
+ }
+
+ /**
+ * Writes a byte to this MBB at the given index
+ * @param index
+ * @param b
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff put(int index, byte b) {
+ int itemIndex = getItemIndex(limit);
+ ByteBuffer item = items[itemIndex];
+ item.put(index - itemBeginPos[itemIndex], b);
+ return this;
+ }
+
+ /**
+ * Copies from a src MBB to this MBB.
+ * @param offset the position in this MBB to which the copy should happen
+ * @param src the src MBB
+ * @param srcOffset the offset in the src MBB from where the elements should be read
+ * @param length the length upto which the copy should happen
+ */
+ @Override
+ public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+ int destItemIndex = getItemIndex(offset);
+ int srcItemIndex = getItemIndex(srcOffset);
+ ByteBuffer destItem = this.items[destItemIndex];
+ offset = offset - this.itemBeginPos[destItemIndex];
+
+ ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
+ srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
+ int toRead, toWrite, toMove;
+ while (length > 0) {
+ toWrite = destItem.limit() - offset;
+ toRead = srcItem.limit() - srcOffset;
+ toMove = Math.min(length, Math.min(toRead, toWrite));
+ ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove);
+ length -= toMove;
+ if (length == 0) break;
+ if (toRead < toWrite) {
+ srcItem = getItemByteBuffer(src, ++srcItemIndex);
+ srcOffset = 0;
+ offset += toMove;
+ } else if (toRead > toWrite) {
+ destItem = this.items[++destItemIndex];
+ offset = 0;
+ srcOffset += toMove;
+ } else {
+ // toRead = toWrite case
+ srcItem = getItemByteBuffer(src, ++srcItemIndex);
+ srcOffset = 0;
+ destItem = this.items[++destItemIndex];
+ offset = 0;
+ }
+ }
+ return this;
+ }
+
+ private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
+ return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+ : ((MultiByteBuff) buf).items[index];
+ }
+
+ /**
+ * Writes an int to this MBB at its current position. Also advances the position by size of int
+ * @param val Int value to write
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff putInt(int val) {
+ if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
+ this.curItem.putInt(val);
+ return this;
+ }
+ if (this.curItemIndex == this.items.length - 1) {
+ throw new BufferOverflowException();
+ }
+ // During read, we will read as byte by byte for this case. So just write in Big endian
+ put(int3(val));
+ put(int2(val));
+ put(int1(val));
+ put(int0(val));
+ return this;
+ }
+
+ private static byte int3(int x) {
+ return (byte) (x >> 24);
+ }
+
+ private static byte int2(int x) {
+ return (byte) (x >> 16);
+ }
+
+ private static byte int1(int x) {
+ return (byte) (x >> 8);
+ }
+
+ private static byte int0(int x) {
+ return (byte) (x);
+ }
+
+ /**
+ * Copies from the given byte[] to this MBB
+ * @param src
+ * @return this MBB
+ */
+ @Override
+ public final MultiByteBuff put(byte[] src) {
+ return put(src, 0, src.length);
+ }
+
+ /**
+ * Copies from the given byte[] to this MBB
+ * @param src
+ * @param offset the position in the byte array from which the copy should be done
+ * @param length the length upto which the copy should happen
+ * @return this MBB
+ */
+ @Override
+ public MultiByteBuff put(byte[] src, int offset, int length) {
+ if (this.curItem.remaining() >= length) {
+ ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
+ return this;
+ }
+ int end = offset + length;
+ for (int i = offset; i < end; i++) {
+ this.put(src[i]);
+ }
+ return this;
+ }
+
+
+ /**
+ * Writes a long to this MBB at its current position. Also advances the position by size of long
+ * @param val Long value to write
+ * @return this object
+ */
+ @Override
+ public MultiByteBuff putLong(long val) {
+ if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
+ this.curItem.putLong(val);
+ return this;
+ }
+ if (this.curItemIndex == this.items.length - 1) {
+ throw new BufferOverflowException();
+ }
+ // During read, we will read as byte by byte for this case. So just write in Big endian
+ put(long7(val));
+ put(long6(val));
+ put(long5(val));
+ put(long4(val));
+ put(long3(val));
+ put(long2(val));
+ put(long1(val));
+ put(long0(val));
+ return this;
+ }
+
+ private static byte long7(long x) {
+ return (byte) (x >> 56);
+ }
+
+ private static byte long6(long x) {
+ return (byte) (x >> 48);
+ }
+
+ private static byte long5(long x) {
+ return (byte) (x >> 40);
+ }
+
+ private static byte long4(long x) {
+ return (byte) (x >> 32);
+ }
+
+ private static byte long3(long x) {
+ return (byte) (x >> 24);
+ }
+
+ private static byte long2(long x) {
+ return (byte) (x >> 16);
+ }
+
+ private static byte long1(long x) {
+ return (byte) (x >> 8);
+ }
+
+ private static byte long0(long x) {
+ return (byte) (x);
+ }
+
+ /**
+ * Jumps the current position of this MBB by specified length.
+ * @param length
+ */
+ @Override
+ public MultiByteBuff skip(int length) {
+ // Get available bytes from this item and remaining from next
+ int jump = 0;
+ while (true) {
+ jump = this.curItem.remaining();
+ if (jump >= length) {
+ this.curItem.position(this.curItem.position() + length);
+ break;
+ }
+ this.curItem.position(this.curItem.position() + jump);
+ length -= jump;
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ }
+ return this;
+ }
+
+ /**
+ * Jumps back the current position of this MBB by specified length.
+ * @param length
+ */
+ @Override
+ public MultiByteBuff moveBack(int length) {
+ while (length != 0) {
+ if (length > curItem.position()) {
+ length -= curItem.position();
+ this.curItem.position(0);
+ this.curItemIndex--;
+ this.curItem = this.items[curItemIndex];
+ } else {
+ this.curItem.position(curItem.position() - length);
+ break;
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+ * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+ * as such will be returned. So users are warned not to change the position or limit of this
+ * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+ * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+ * the bytes to a newly created ByteBuffer of required size and return that.
+ *
+ * @param length number of bytes required.
+ * @return bytes from current position till length specified, as a single ByteButter.
+ */
+ @Override
+ public ByteBuffer asSubByteBuffer(int length) {
+ if (this.curItem.remaining() >= length) {
+ return this.curItem;
+ }
+ int offset = 0;
+ byte[] dupB = new byte[length];
+ int locCurItemIndex = curItemIndex;
+ ByteBuffer locCurItem = curItem;
+ while (length > 0) {
+ int toRead = Math.min(length, locCurItem.remaining());
+ ByteBufferUtils
+ .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+ length -= toRead;
+ if (length == 0)
+ break;
+ locCurItemIndex++;
+ locCurItem = this.items[locCurItemIndex];
+ offset += toRead;
+ }
+ return ByteBuffer.wrap(dupB);
+ }
+
+ /**
+ * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+ * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+ * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+ * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+ * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+ * ByteBuffer of required size and return that.
+ *
+ * @param offset the offset in this MBB from where the subBuffer should be created
+ * @param length the length of the subBuffer
+ * @param pair a pair that will have the bytes from the current position till length specified, as
+ * a single ByteBuffer and offset in that Buffer where the bytes starts. The method would
+ * set the values on the pair that is passed in by the caller
+ */
+ @Override
+ public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) {
+ if (this.itemBeginPos[this.curItemIndex] <= offset) {
+ int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
+ if (this.curItem.limit() - relOffsetInCurItem >= length) {
+ pair.setFirst(this.curItem);
+ pair.setSecond(relOffsetInCurItem);
+ return;
+ }
+ }
+ int itemIndex = getItemIndex(offset);
+ ByteBuffer item = this.items[itemIndex];
+ offset = offset - this.itemBeginPos[itemIndex];
+ if (item.limit() - offset >= length) {
+ pair.setFirst(item);
+ pair.setSecond(offset);
+ return;
+ }
+ byte[] dst = new byte[length];
+ int destOffset = 0;
+ while (length > 0) {
+ int toRead = Math.min(length, item.limit() - offset);
+ ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
+ length -= toRead;
+ if (length == 0) break;
+ itemIndex++;
+ item = this.items[itemIndex];
+ destOffset += toRead;
+ offset = 0;
+ }
+ pair.setFirst(ByteBuffer.wrap(dst));
+ pair.setSecond(0);
+ return;
+ }
+
+ /**
+ * Copies the content from an this MBB to a ByteBuffer
+ * @param out the ByteBuffer to which the copy has to happen
+ * @param sourceOffset the offset in the MBB from which the elements has
+ * to be copied
+ * @param length the length in the MBB upto which the elements has to be copied
+ */
+ @Override
+ public void get(ByteBuffer out, int sourceOffset,
+ int length) {
+ // Not used from real read path actually. So not going with
+ // optimization
+ for (int i = 0; i < length; ++i) {
+ out.put(this.get(sourceOffset + i));
+ }
+ }
+
+ /**
+ * Copy the content from this MBB to a byte[] based on the given offset and
+ * length
+ *
+ * @param offset
+ * the position from where the copy should start
+ * @param length
+ * the length upto which the copy has to be done
+ * @return byte[] with the copied contents from this MBB.
+ */
+ @Override
+ public byte[] toBytes(int offset, int length) {
+ byte[] output = new byte[length];
+ int itemIndex = getItemIndex(offset);
+ ByteBuffer item = this.items[itemIndex];
+ int toRead = item.limit() - offset;
+ int destinationOffset = 0;
+ while (length > 0) {
+ toRead = Math.min(length, toRead);
+ ByteBufferUtils.copyFromBufferToArray(output, item, offset, destinationOffset, toRead);
+ length -= toRead;
+ if (length == 0)
+ break;
+ destinationOffset += toRead;
+ offset = 0;
+ item = items[++itemIndex];
+ toRead = item.remaining();
+ }
+ return output;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof MultiByteBuff)) return false;
+ if (this == obj) return true;
+ MultiByteBuff that = (MultiByteBuff) obj;
+ if (this.capacity() != that.capacity()) return false;
+ if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
+ that.limit()) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ for (ByteBuffer b : this.items) {
+ hash += b.hashCode();
+ }
+ return hash;
+ }
+}