You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/14 17:35:26 UTC
[2/2] hbase git commit: HBASE-15788 Use Offheap ByteBuffers from
BufferPool to read RPC requests.
HBASE-15788 Use Offheap ByteBuffers from BufferPool to read RPC requests.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c3685760
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c3685760
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c3685760
Branch: refs/heads/master
Commit: c3685760f004450667920144f926383eb307de53
Parents: 9250bf8
Author: anoopsamjohn <an...@gmail.com>
Authored: Mon Nov 14 23:05:05 2016 +0530
Committer: anoopsamjohn <an...@gmail.com>
Committed: Mon Nov 14 23:05:05 2016 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Put.java | 5 +-
.../hadoop/hbase/ipc/CellBlockBuilder.java | 38 ++-
.../hadoop/hbase/ipc/TestCellBlockBuilder.java | 4 +-
.../apache/hadoop/hbase/OffheapKeyValue.java | 29 ++-
.../apache/hadoop/hbase/codec/CellCodec.java | 8 +-
.../hadoop/hbase/codec/CellCodecWithTags.java | 8 +-
.../org/apache/hadoop/hbase/codec/Codec.java | 4 +-
.../hadoop/hbase/codec/KeyValueCodec.java | 56 ++++-
.../hbase/codec/KeyValueCodecWithTags.java | 16 +-
.../hadoop/hbase/io/ByteArrayOutputStream.java | 2 +-
.../hadoop/hbase/io/ByteBufferOutputStream.java | 2 +-
.../apache/hadoop/hbase/io/ByteBufferPool.java | 4 +-
.../io/ByteBufferSupportDataOutputStream.java | 44 ----
.../hbase/io/ByteBufferSupportOutputStream.java | 51 ----
.../hadoop/hbase/io/ByteBufferWriter.java | 53 ++++
.../io/ByteBufferWriterDataOutputStream.java | 44 ++++
.../hbase/io/ByteBufferWriterOutputStream.java | 90 +++++++
.../org/apache/hadoop/hbase/nio/ByteBuff.java | 45 ++++
.../apache/hadoop/hbase/nio/MultiByteBuff.java | 34 +++
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 12 +-
.../hadoop/hbase/util/ByteBufferUtils.java | 10 +-
.../hbase/io/TestTagCompressionContext.java | 4 +-
.../apache/hadoop/hbase/codec/MessageCodec.java | 8 +-
.../hadoop/hbase/io/hfile/HFileBlock.java | 4 +-
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 5 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 239 ++++++++++++++++---
.../hadoop/hbase/regionserver/HRegion.java | 4 +-
.../wal/AsyncProtobufLogWriter.java | 4 +-
.../hbase/regionserver/wal/WALCellCodec.java | 18 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 4 +-
.../apache/hadoop/hbase/ipc/TestRpcServer.java | 140 +++++++++++
31 files changed, 782 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index dbaf3a7..61a71f7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -319,9 +320,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
byte [] family = CellUtil.cloneFamily(kv);
List<Cell> list = getCellList(family);
//Checking that the row of the kv is the same as the put
- int res = Bytes.compareTo(this.row, 0, row.length,
- kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
- if (res != 0) {
+ if (!CellUtil.matchingRow(kv, this.row)) {
throw new WrongRowIOException("The row in " + kv.toString() +
" doesn't match the original one " + Bytes.toStringBinary(this.row));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index fb2cafa..d00490b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
@@ -35,10 +37,13 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -238,15 +243,15 @@ class CellBlockBuilder {
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
- ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) {
- cellBlockBuf = decompress(compressor, cellBlockBuf);
+ ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
+ return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
- return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
+ return codec.getDecoder(new ByteArrayInputStream(cellBlock));
}
/**
@@ -258,7 +263,7 @@ class CellBlockBuilder {
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
- final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
+ final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
@@ -268,27 +273,38 @@ class CellBlockBuilder {
return codec.getDecoder(cellBlock);
}
- private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+ private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
throws IOException {
+ ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
+ compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
+ return cellBlock;
+ }
+
+ private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
+ throws IOException {
+ ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
+ compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ return new SingleByteBuff(cellBlock);
+ }
+
+ private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
+ int osInitialSize) throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
- CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
- poolDecompressor);
+ CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
- bbos = new ByteBufferOutputStream(
- cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ bbos = new ByteBufferOutputStream(osInitialSize);
IOUtils.copy(cis, bbos);
bbos.close();
- cellBlock = bbos.getByteBuffer();
+ return bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
- return cellBlock;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
index ccabe66..9addaa5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -78,7 +79,8 @@ public class TestCellBlockBuilder {
CellScanner cellScanner = sized ? getSizedCellScanner(cells)
: CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
- cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb);
+ cellScanner = builder.createCellScannerReusingBuffers(codec, compressor,
+ new SingleByteBuff(bb));
int i = 0;
while (cellScanner.advance()) {
i++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index 2165362..06a0ed6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
protected final ByteBuffer buf;
protected final int offset;
protected final int length;
+ protected final boolean hasTags;
private final short rowLen;
private final int keyLen;
private long seqId = 0;
- private final boolean hasTags;
// TODO : See if famLen can be cached or not?
private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
@@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
this.seqId = seqId;
}
+ public OffheapKeyValue(ByteBuffer buf, int offset, int length) {
+ assert buf.isDirect();
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
+ rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET);
+ keyLen = ByteBufferUtils.toInt(this.buf, this.offset);
+ int tagsLen = this.length
+ - (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+ this.hasTags = tagsLen > 0;
+ }
+
@Override
public byte[] getRowArray() {
return CellUtil.cloneRow(this);
@@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
@Override
public void setTimestamp(long ts) throws IOException {
- // This Cell implementation is not yet used in write path.
- // TODO when doing HBASE-15179
- throw new UnsupportedOperationException();
+ ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0,
+ Bytes.SIZEOF_LONG);
+ }
+
+ private int getTimestampOffset() {
+ return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen
+ - KeyValue.TIMESTAMP_TYPE_SIZE;
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
- // This Cell implementation is not yet used in write path.
- // TODO when doing HBASE-15179
- throw new UnsupportedOperationException();
+ ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset,
+ Bytes.SIZEOF_LONG);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
index d6b64f6..ca2e3e8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -118,8 +118,8 @@ public class CellCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
index 7326884..2dca10a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
index c8a4cdc..d1463ee 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
/**
* Encoder/Decoder for Cell.
@@ -51,6 +51,6 @@ public interface Codec {
interface Decoder extends CellScanner {};
Decoder getDecoder(InputStream is);
- Decoder getDecoder(ByteBuffer buf);
+ Decoder getDecoder(ByteBuff buf);
Encoder getEncoder(OutputStream os);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
index 2609398..00ce023 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NoTagsKeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -76,24 +78,28 @@ public class KeyValueCodec implements Codec {
}
}
- public static class ByteBufferedKeyValueDecoder implements Codec.Decoder {
+ public static class ByteBuffKeyValueDecoder implements Codec.Decoder {
- protected final ByteBuffer buf;
+ protected final ByteBuff buf;
protected Cell current = null;
- public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+ public ByteBuffKeyValueDecoder(ByteBuff buf) {
this.buf = buf;
}
@Override
public boolean advance() throws IOException {
- if (this.buf.remaining() <= 0) {
+ if (!this.buf.hasRemaining()) {
return false;
}
- int len = ByteBufferUtils.toInt(buf);
- assert buf.hasArray();
- this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len);
- buf.position(buf.position() + len);
+ int len = buf.getInt();
+ ByteBuffer bb = buf.asSubByteBuffer(len);
+ if (bb.isDirect()) {
+ this.current = createCell(bb, bb.position(), len);
+ } else {
+ this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len);
+ }
+ buf.skip(len);
return true;
}
@@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec {
return new ShareableMemoryNoTagsKeyValue(buf, offset, len);
}
+ protected Cell createCell(ByteBuffer bb, int pos, int len) {
+ // We know there is not going to be any tags.
+ return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0);
+ }
+
static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory {
public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
super(bytes, offset, length);
@@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec {
return kv;
}
}
+
+ static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory {
+ public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags,
+ long seqId) {
+ super(buf, offset, length, hasTags, seqId);
+ }
+
+ @Override
+ public Cell cloneToCell() {
+ byte[] copy = new byte[this.length];
+ ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length);
+ KeyValue kv;
+ if (this.hasTags) {
+ kv = new KeyValue(copy, 0, copy.length);
+ } else {
+ kv = new NoTagsKeyValue(copy, 0, copy.length);
+ }
+ kv.setSequenceId(this.getSequenceId());
+ return kv;
+ }
+ }
}
/**
@@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return new ByteBufferedKeyValueDecoder(buf);
+ public Decoder getDecoder(ByteBuff buf) {
+ return new ByteBuffKeyValueDecoder(buf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
index 63c02e8..84c4840 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
@@ -78,16 +79,21 @@ public class KeyValueCodecWithTags implements Codec {
}
}
- public static class ByteBufferedKeyValueDecoder
- extends KeyValueCodec.ByteBufferedKeyValueDecoder {
+ public static class ByteBuffKeyValueDecoder extends KeyValueCodec.ByteBuffKeyValueDecoder {
- public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+ public ByteBuffKeyValueDecoder(ByteBuff buf) {
super(buf);
}
+ @Override
protected Cell createCell(byte[] buf, int offset, int len) {
return new ShareableMemoryKeyValue(buf, offset, len);
}
+
+ @Override
+ protected Cell createCell(ByteBuffer bb, int pos, int len) {
+ return new ShareableMemoryOffheapKeyValue(bb, pos, len);
+ }
}
/**
@@ -104,7 +110,7 @@ public class KeyValueCodecWithTags implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return new ByteBufferedKeyValueDecoder(buf);
+ public Decoder getDecoder(ByteBuff buf) {
+ return new ByteBuffKeyValueDecoder(buf);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index 93121df..22eb156 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* supports writing ByteBuffer directly to it.
*/
@InterfaceAudience.Private
-public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream {
+public class ByteArrayOutputStream extends OutputStream implements ByteBufferWriter {
// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index f77092d..f6f7def 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ByteBufferOutputStream extends OutputStream
- implements ByteBufferSupportOutputStream {
+ implements ByteBufferWriter {
// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
index e528f02..115671d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -140,7 +140,7 @@ public class ByteBufferPool {
buffers.offer(buf);
}
- int getBufferSize() {
+ public int getBufferSize() {
return this.bufferSize;
}
@@ -148,7 +148,7 @@ public class ByteBufferPool {
* @return Number of free buffers
*/
@VisibleForTesting
- int getQueueSize() {
+ public int getQueueSize() {
return buffers.size();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
deleted file mode 100644
index 3a52e63..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-/**
- * Our extension of DataOutputStream which implements ByteBufferSupportOutputStream
- */
-@InterfaceAudience.Private
-public class ByteBufferSupportDataOutputStream extends DataOutputStream
- implements ByteBufferSupportOutputStream {
-
- public ByteBufferSupportDataOutputStream(OutputStream out) {
- super(out);
- }
-
- @Override
- public void write(ByteBuffer b, int off, int len) throws IOException {
- ByteBufferUtils.copyBufferToStream(out, b, off, len);
- written += len;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
deleted file mode 100644
index ccb5c81..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Interface adds support for writing {@link ByteBuffer} into OutputStream.
- */
-@InterfaceAudience.Private
-public interface ByteBufferSupportOutputStream {
-
- /**
- * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
- * to this output stream.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException
- * if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
- * the output stream is closed.
- */
- void write(ByteBuffer b, int off, int len) throws IOException;
-
- /**
- * Writes an <code>int</code> to the underlying output stream as four
- * bytes, high byte first.
- * @param i the <code>int</code> to write
- * @throws IOException if an I/O error occurs.
- */
- void writeInt(int i) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
new file mode 100644
index 0000000..012080c
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This interface marks a class to support writing ByteBuffers into it.
+ * @see ByteArrayOutputStream
+ * @see ByteBufferOutputStream
+ */
+@InterfaceAudience.Private
+public interface ByteBufferWriter {
+
+ /**
+ * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @exception IOException if an I/O error occurs.
+ */
+ void write(ByteBuffer b, int off, int len) throws IOException;
+
+ /**
+ * Writes an <code>int</code> to the underlying output stream as four bytes, high byte first.
+ * @param i the <code>int</code> to write
+ * @throws IOException if an I/O error occurs.
+ */
+ // This is pure performance oriented API been added here. It has nothing to do with
+ // ByteBuffer and so not fully belong to here. This allows an int to be written at one go instead
+ // of 4 (4 bytes one by one).
+ // TODO remove it from here?
+ void writeInt(int i) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
new file mode 100644
index 0000000..7ddb8a9
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * Our extension of DataOutputStream which implements ByteBufferWriter
+ */
+@InterfaceAudience.Private
+public class ByteBufferWriterDataOutputStream extends DataOutputStream
+ implements ByteBufferWriter {
+
+ public ByteBufferWriterDataOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ ByteBufferUtils.copyBufferToStream(out, b, off, len);
+ written += len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
new file mode 100644
index 0000000..56c6956
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * When deal with OutputStream which is not ByteBufferWriter type, wrap it with this class. We will
+ * have to write offheap ByteBuffer (DBB) data into the OS. This class is having a temp byte array
+ * to which we can copy the DBB data for writing to the OS.
+ * <br>
+ * This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is
+ * ByteBufferWriter. But in case of FSHLog, the OS passed by DFS client, is not of type
+ * ByteBufferWriter. We will need this temp solution until DFS client supports writing ByteBuffer
+ * directly to the OS it creates.
+ * <br>
+ * Note: This class is not thread safe.
+ */
+@InterfaceAudience.Private
+public class ByteBufferWriterOutputStream extends OutputStream
+ implements ByteBufferWriter {
+
+ private static final int TEMP_BUF_LENGTH = 4 * 1024;
+ private final OutputStream os;
+ private byte[] tempBuf = null;
+
+ public ByteBufferWriterOutputStream(OutputStream os) {
+ this.os = os;
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ byte[] buf = null;
+ if (len > TEMP_BUF_LENGTH) {
+ buf = new byte[len];
+ } else {
+ if (this.tempBuf == null) {
+ this.tempBuf = new byte[TEMP_BUF_LENGTH];
+ }
+ buf = this.tempBuf;
+ }
+ ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len);
+ this.os.write(buf, 0, len);
+ }
+
+ @Override
+ public void writeInt(int i) throws IOException {
+ StreamUtils.writeInt(this.os, i);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.os.write(b);
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ this.os.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.os.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.os.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 183a031..60202a0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -34,7 +36,10 @@ import org.apache.hadoop.io.WritableUtils;
* helps us in the read path.
*/
@InterfaceAudience.Private
+// TODO to have another name. This can easily get confused with netty's ByteBuf
public abstract class ByteBuff {
+ private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
+
/**
* @return this ByteBuff's current position
*/
@@ -356,6 +361,14 @@ public abstract class ByteBuff {
public abstract long getLongAfterPosition(int offset);
/**
+ * Copy the content from this ByteBuff to a byte[].
+ * @return byte[] with the copied contents from this ByteBuff.
+ */
+ public byte[] toBytes() {
+ return toBytes(0, this.limit());
+ }
+
+ /**
* Copy the content from this ByteBuff to a byte[] based on the given offset and
* length
*
@@ -389,7 +402,39 @@ public abstract class ByteBuff {
*/
public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
+ /**
+ * Reads bytes from the given channel into this ByteBuff
+ * @param channel
+ * @return The number of bytes read from the channel
+ * @throws IOException
+ */
+ public abstract int read(ReadableByteChannel channel) throws IOException;
+
// static helper methods
+ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
+ if (buf.remaining() <= NIO_BUFFER_LIMIT) {
+ return channel.read(buf);
+ }
+ int originalLimit = buf.limit();
+ int initialRemaining = buf.remaining();
+ int ret = 0;
+
+ while (buf.remaining() > 0) {
+ try {
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+ buf.limit(buf.position() + ioSize);
+ ret = channel.read(buf);
+ if (ret < ioSize) {
+ break;
+ }
+ } finally {
+ buf.limit(originalLimit);
+ }
+ }
+ int nBytes = initialRemaining - buf.remaining();
+ return (nBytes > 0) ? nBytes : ret;
+ }
+
/**
* Read integer from ByteBuff coded in 7 bits and increment position.
* @return Read integer.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 107bb3f..948321d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,16 +17,20 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
* sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@@ -1071,6 +1075,28 @@ public class MultiByteBuff extends ByteBuff {
}
@Override
+ public int read(ReadableByteChannel channel) throws IOException {
+ int total = 0;
+ while (true) {
+ // Read max possible into the current BB
+ int len = channelRead(channel, this.curItem);
+ if (len > 0)
+ total += len;
+ if (this.curItem.hasRemaining()) {
+ // We were not able to read enough to fill the current BB itself. Means there is no point in
+ // doing more reads from Channel. Only this much there for now.
+ break;
+ } else {
+ if (this.curItemIndex >= this.limitedItemIndex)
+ break;
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ }
+ }
+ return total;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false;
if (this == obj) return true;
@@ -1091,4 +1117,12 @@ public class MultiByteBuff extends ByteBuff {
}
return hash;
}
+
+ /**
+ * @return the ByteBuffers which this wraps.
+ */
+ @VisibleForTesting
+ public ByteBuffer[] getEnclosingByteBuffers() {
+ return this.items;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 9ad28dc..0e45410 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -25,6 +27,8 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
+import com.google.common.annotations.VisibleForTesting;
+
import sun.nio.ch.DirectBuffer;
/**
@@ -313,6 +317,11 @@ public class SingleByteBuff extends ByteBuff {
}
@Override
+ public int read(ReadableByteChannel channel) throws IOException {
+ return channelRead(channel, buf);
+ }
+
+ @Override
public boolean equals(Object obj) {
if(!(obj instanceof SingleByteBuff)) return false;
return this.buf.equals(((SingleByteBuff)obj).buf);
@@ -326,7 +335,8 @@ public class SingleByteBuff extends ByteBuff {
/**
* @return the ByteBuffer which this wraps.
*/
- ByteBuffer getEnclosingByteBuffer() {
+ @VisibleForTesting
+ public ByteBuffer getEnclosingByteBuffer() {
return this.buf;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 0653d1a..c9a19ff 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
@@ -144,8 +144,8 @@ public final class ByteBufferUtils {
// We have writeInt in ByteBufferOutputStream so that it can directly write
// int to underlying
// ByteBuffer in one step.
- if (out instanceof ByteBufferSupportOutputStream) {
- ((ByteBufferSupportOutputStream) out).writeInt(value);
+ if (out instanceof ByteBufferWriter) {
+ ((ByteBufferWriter) out).writeInt(value);
} else {
StreamUtils.writeInt(out, value);
}
@@ -182,8 +182,8 @@ public final class ByteBufferUtils {
*/
public static void copyBufferToStream(OutputStream out, ByteBuffer in,
int offset, int length) throws IOException {
- if (out instanceof ByteBufferSupportOutputStream) {
- ((ByteBufferSupportOutputStream) out).write(in, offset, length);
+ if (out instanceof ByteBufferWriter) {
+ ((ByteBufferWriter) out).write(in, offset, length);
} else if (in.hasArray()) {
out.write(in.array(), in.arrayOffset() + offset, length);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index a332a63..67434a0 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -78,7 +78,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+ DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
int tagsLength1 = kv1.getTagsLength();
@@ -127,7 +127,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+ DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
int tagsLength1 = kv1.getTagsLength();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
index ea162fc..41dc387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -83,8 +83,8 @@ public class MessageCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 13b501a..59b8884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@@ -962,7 +962,7 @@ public class HFileBlock implements Cacheable {
state = State.WRITING;
// We will compress it later in finishBlock()
- userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
+ userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 34140a9..d570b17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class CallRunner {
- private static final Log LOG = LogFactory.getLog(CallRunner.class);
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
@@ -143,6 +140,8 @@ public class CallRunner {
sucessful = true;
}
}
+ // return back the RPC request read BB we can do here. It is done by now.
+ call.cleanup();
// Set the response
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1c2d51f..6eefaac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
@@ -99,6 +100,9 @@ import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -146,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -304,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider;
private final ByteBufferPool reservoir;
+ // The requests and response will use buffers from ByteBufferPool, when the size of the
+ // request/response is at least this size.
+ // We make this to be 1/6th of the pool buffer size.
+ private final int minSizeForReservoirUse;
private volatile boolean allowFallbackToSimpleAuth;
@@ -344,10 +353,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected boolean isError;
protected TraceInfo tinfo;
private ByteBufferListOutputStream cellBlockStream = null;
+ private CallCleanup reqCleanup = null;
private User user;
private InetAddress remoteAddress;
- private RpcCallback callback;
+ private RpcCallback rpcCallback;
private long responseCellSize = 0;
private long responseBlockSize = 0;
@@ -357,7 +367,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
- long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
+ long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ CallCleanup reqCleanup) {
this.id = id;
this.service = service;
this.md = md;
@@ -377,6 +388,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
+ this.reqCleanup = reqCleanup;
}
/**
@@ -391,9 +403,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// got from pool.
this.cellBlockStream = null;
}
+ cleanup();// If the call was run successfuly, we might have already returned the
+ // BB back to pool. No worries..Then inputCellBlock will be null
this.connection.decRpcCount(); // Say that we're done with this call.
}
+ protected void cleanup() {
+ if (this.reqCleanup != null) {
+ this.reqCleanup.run();
+ this.reqCleanup = null;
+ }
+ }
+
@Override
public String toString() {
return toShortString() + " param: " +
@@ -515,9 +536,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client.
- if (this.callback != null) {
+ if (this.rpcCallback != null) {
try {
- this.callback.run();
+ this.rpcCallback.run();
} catch (Exception e) {
// Don't allow any exception here to kill this handler thread.
LOG.warn("Exception while running the Rpc Callback.", e);
@@ -722,7 +743,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public synchronized void setCallBack(RpcCallback callback) {
- this.callback = callback;
+ this.rpcCallback = callback;
}
@Override
@@ -731,6 +752,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
+ @FunctionalInterface
+ static interface CallCleanup {
+ void run();
+ }
+
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
@@ -1289,7 +1315,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// If the connection header has been read or not.
private boolean connectionHeaderRead = false;
protected SocketChannel channel;
- private ByteBuffer data;
+ private ByteBuff data;
+ private CallCleanup callCleanup;
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
private final Lock responseWriteLock = new ReentrantLock();
@@ -1327,17 +1354,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
- null, null, this, null, 0, null, null, 0);
+ null, null, this, null, 0, null, null, 0, null);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
- 0, null, null, 0);
+ 0, null, null, 0, null);
// Fake 'call' for connection header response
private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
- null, null, null, null, null, this, null, 0, null, null, 0);
+ null, null, null, null, null, this, null, 0, null, null, 0, null);
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
@@ -1352,6 +1379,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
+ this.callCleanup = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
@@ -1437,7 +1465,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return authorizedUgi;
}
- private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
+ private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (LOG.isTraceEnabled())
@@ -1447,13 +1475,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (!useWrap) {
processOneRpc(saslToken);
} else {
- byte[] b = saslToken.array();
+ byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
byte [] plaintextData;
if (useCryptoAesWrap) {
// unwrap with CryptoAES
- plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit());
+ plaintextData = cryptoAES.unwrap(b, 0, b.length);
} else {
- plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
+ plaintextData = saslServer.unwrap(b, 0, b.length);
}
processUnwrappedData(plaintextData);
}
@@ -1506,7 +1534,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()");
}
- replyToken = saslServer.evaluateResponse(saslToken.array());
+ replyToken = saslServer
+ .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1759,7 +1788,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Notify the client about the offending request
Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
- null, this, responder, 0, null, this.addr,0);
+ null, this, responder, 0, null, this.addr, 0, null);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@@ -1779,7 +1808,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return -1;
}
- data = ByteBuffer.allocate(dataLength);
+ // Initialize this.data with a ByteBuff.
+ // This call will allocate a ByteBuff to read request into and assign to this.data
+ // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
+ // assign to this.callCleanup
+ initByteBuffToReadInto(dataLength);
// Increment the rpc count. This counter will be decreased when we write
// the response. If we want the connection to be detected as idle properly, we
@@ -1787,7 +1820,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
incRpcCount();
}
- count = channelRead(channel, data);
+ count = channelDataRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
process();
@@ -1796,11 +1829,41 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return count;
}
+ // It creates the ByteBuff and CallCleanup and assign to Connection instance.
+ private void initByteBuffToReadInto(int length) {
+ // We create random on heap buffers are read into those when
+ // 1. ByteBufferPool is not there.
+ // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
+ // waste then. Also if all the reqs are of this size, we will be creating larger sized
+ // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
+ // RegionOpen.
+ // 3. If it is an initial handshake signal or initial connection request. Any way then
+ // condition 2 itself will match
+ // 4. When SASL use is ON.
+ if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
+ || length < minSizeForReservoirUse) {
+ this.data = new SingleByteBuff(ByteBuffer.allocate(length));
+ } else {
+ Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(reservoir,
+ minSizeForReservoirUse, length);
+ this.data = pair.getFirst();
+ this.callCleanup = pair.getSecond();
+ }
+ }
+
+ protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
+ int count = buf.read(channel);
+ if (count > 0) {
+ metrics.receivedBytes(count);
+ }
+ return count;
+ }
+
/**
* Process the data buffer and clean the connection state for the next call.
*/
private void process() throws IOException, InterruptedException {
- data.flip();
+ data.rewind();
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
@@ -1816,6 +1879,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} finally {
dataLengthBuffer.clear(); // Clean for the next call
data = null; // For the GC
+ this.callCleanup = null;
}
}
@@ -1831,7 +1895,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0,
+ null);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@@ -1839,9 +1904,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
// Reads the connection header following version
- private void processConnectionHeader(ByteBuffer buf) throws IOException {
- this.connectionHeader = ConnectionHeader.parseFrom(
- new ByteBufferInputStream(buf));
+ private void processConnectionHeader(ByteBuff buf) throws IOException {
+ if (buf.hasArray()) {
+ this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+ } else {
+ CodedInputStream cis = CodedInputStream
+ .newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
+ cis.enableAliasing(true);
+ this.connectionHeader = ConnectionHeader.parseFrom(cis);
+ }
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException();
this.service = getService(services, serviceName);
@@ -2043,13 +2114,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
- processOneRpc(unwrappedData);
+ processOneRpc(new SingleByteBuff(unwrappedData));
unwrappedData = null;
}
}
}
- private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
+ private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
} else {
@@ -2071,12 +2142,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @throws IOException
* @throws InterruptedException
*/
- protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
long totalRequestSize = buf.limit();
int offset = 0;
// Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use backing array.
- CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+ CodedInputStream cis;
+ if (buf.hasArray()) {
+ cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+ } else {
+ cis = CodedInputStream.newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
+ cis.enableAliasing(true);
+ }
int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder();
@@ -2093,7 +2170,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0);
+ responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@@ -2127,7 +2204,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
- cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
+ ByteBuff dup = buf.duplicate();
+ dup.limit(offset + header.getCellBlockMeta().getLength());
+ cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
+ this.compressionCodec, dup);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
@@ -2148,7 +2228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0);
+ responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
@@ -2164,7 +2244,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
- totalRequestSize, traceInfo, this.addr, timeout);
+ totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSizeInBytes.add(-1 * call.getSize());
@@ -2211,6 +2291,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected synchronized void close() {
disposeSasl();
data = null;
+ callCleanup = null;
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception ignored) {
@@ -2301,8 +2382,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
+ this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
} else {
reservoir = null;
+ this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
}
this.server = server;
this.services = services;
@@ -2347,6 +2430,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.scheduler.init(new RpcSchedulerContext(this));
}
+ @VisibleForTesting
+ static int getMinSizeForReservoirUse(ByteBufferPool pool) {
+ return pool.getBufferSize() / 6;
+ }
+
@Override
public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf);
@@ -2755,6 +2843,55 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
+ * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
+ * as much as possible.
+ *
+ * @param pool The ByteBufferPool to use
+ * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
+ * need of size below this, create on heap ByteBuffer.
+ * @param reqLen Bytes count in request
+ */
+ @VisibleForTesting
+ static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
+ int minSizeForPoolUse, int reqLen) {
+ ByteBuff resultBuf;
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>((reqLen / pool.getBufferSize()) + 1);
+ int remain = reqLen;
+ ByteBuffer buf = null;
+ while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
+ bbs.add(buf);
+ remain -= pool.getBufferSize();
+ }
+ ByteBuffer[] bufsFromPool = null;
+ if (bbs.size() > 0) {
+ bufsFromPool = new ByteBuffer[bbs.size()];
+ bbs.toArray(bufsFromPool);
+ }
+ if (remain > 0) {
+ bbs.add(ByteBuffer.allocate(remain));
+ }
+ if (bbs.size() > 1) {
+ ByteBuffer[] items = new ByteBuffer[bbs.size()];
+ bbs.toArray(items);
+ resultBuf = new MultiByteBuff(items);
+ } else {
+ // We are backed by single BB
+ resultBuf = new SingleByteBuff(bbs.get(0));
+ }
+ resultBuf.limit(reqLen);
+ if (bufsFromPool != null) {
+ final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
+ return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, () -> {
+ // Return back all the BBs to pool
+ for (int i = 0; i < bufsFromPoolFinal.length; i++) {
+ pool.putbackBuffer(bufsFromPoolFinal[i]);
+ }
+ });
+ }
+ return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, null);
+ }
+
+ /**
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing
* call.
@@ -3054,4 +3191,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
-}
+
+ private static class ByteBuffByteInput extends ByteInput {
+
+ private ByteBuff buf;
+ private int offset;
+ private int length;
+
+ ByteBuffByteInput(ByteBuff buf, int offset, int length) {
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public byte read(int offset) {
+ return this.buf.get(getAbsoluteOffset(offset));
+ }
+
+ private int getAbsoluteOffset(int offset) {
+ return this.offset + offset;
+ }
+
+ @Override
+ public int read(int offset, byte[] out, int outOffset, int len) {
+ this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
+ return len;
+ }
+
+ @Override
+ public int read(int offset, ByteBuffer out) {
+ int len = out.remaining();
+ this.buf.get(out, getAbsoluteOffset(offset), len);
+ return len;
+ }
+
+ @Override
+ public int size() {
+ return this.length;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b444a1c..831627b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5201,9 +5201,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
private HStore getHStore(Cell cell) {
for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
- if (Bytes.equals(
- cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- famStore.getKey(), 0, famStore.getKey().length)) {
+ if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
return (HStore) famStore.getValue();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 314bef0..a0ac8a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@@ -57,7 +57,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private AsyncFSOutput output;
private static final class OutputStreamWrapper extends OutputStream
- implements ByteBufferSupportOutputStream {
+ implements ByteBufferWriter {
private final AsyncFSOutput out;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 52dfae0..1a18087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
+import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -356,14 +358,18 @@ public class WALCellCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
- return (compression == null)
- ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
+ if (compression == null) {
+ os = (os instanceof ByteBufferWriter) ? os
+ : new ByteBufferWriterOutputStream(os);
+ return new EnsureKvEncoder(os);
+ }
+ return new CompressedKvEncoder(os, compression);
}
public ByteStringCompressor getByteStringCompressor() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2211e8f..5a9178a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -315,7 +315,7 @@ public abstract class AbstractTestIPC {
}
@Override
- protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");