You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/02/12 02:26:25 UTC
hbase git commit: HBASE-15245 Port HBASE-15177 (Reduce garbage
created under high load) to 0.98
Repository: hbase
Updated Branches:
refs/heads/0.98 81a6fffb3 -> 5716dda86
HBASE-15245 Port HBASE-15177 (Reduce garbage created under high load) to 0.98
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5716dda8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5716dda8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5716dda8
Branch: refs/heads/0.98
Commit: 5716dda86b81e6d208bd70fc4ad4749cfd861414
Parents: 81a6fff
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Feb 11 12:23:59 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Feb 11 15:36:17 2016 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ScannerCallable.java | 10 +-
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 17 ++-
.../hbase/ipc/PayloadCarryingRpcController.java | 5 +-
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 4 +-
.../hadoop/hbase/protobuf/ProtobufUtil.java | 19 +++-
.../apache/hadoop/hbase/ipc/TestIPCUtil.java | 2 +-
.../hadoop/hbase/io/ByteBufferInputStream.java | 107 +++++++++++++++++++
.../hadoop/hbase/util/ByteBufferUtils.java | 85 ++++++++++++++-
.../apache/hadoop/hbase/util/UnsafeAccess.java | 100 ++++++++++++++++-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 41 +++----
.../AnnotationReadingPriorityFunction.java | 8 +-
.../hadoop/hbase/regionserver/HRegion.java | 5 +-
.../hbase/regionserver/HRegionServer.java | 15 ++-
13 files changed, 365 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 8d1c20d..c11160a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -83,7 +83,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
- protected final PayloadCarryingRpcController controller;
+ protected PayloadCarryingRpcController controller;
/**
* @param connection which connection
@@ -155,6 +155,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
@SuppressWarnings("deprecation")
public Result [] call() throws IOException {
+ if (controller == null) {
+ controller = RpcControllerFactory.instantiate(connection.getConfiguration())
+ .newController();
+ }
if (closed) {
if (scannerId != -1) {
close();
@@ -300,7 +304,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true);
try {
- getStub().scan(null, request);
+ getStub().scan(controller, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -317,7 +321,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
getLocation().getRegionInfo().getRegionName(),
this.scan, 0, false);
try {
- ScanResponse response = getStub().scan(null, request);
+ ScanResponse response = getStub().scan(controller, request);
long id = response.getScannerId();
if (logScannerActivity) {
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index f143203..c4481d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
@@ -180,19 +181,18 @@ class IPCUtil {
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock)
throws IOException {
- return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
+ return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
}
/**
* @param codec
- * @param cellBlock
- * @param offset
- * @param length
+ * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+ * position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException
*/
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
- final byte [] cellBlock, final int offset, final int length)
+ final ByteBuffer cellBlock)
throws IOException {
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
@@ -202,13 +202,12 @@ class IPCUtil {
if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis =
- compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
- poolDecompressor);
+ compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
ByteBufferOutputStream bbos = null;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
- bbos = new ByteBufferOutputStream((length - offset) *
+ bbos = new ByteBufferOutputStream(cellBlock.remaining() *
this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos);
bbos.close();
@@ -221,7 +220,7 @@ class IPCUtil {
CodecPool.returnDecompressor(poolDecompressor);
}
} else {
- is = new ByteArrayInputStream(cellBlock, offset, length);
+ is = new ByteBufferInputStream(cellBlock);
}
return codec.getDecoder(is);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index 6b3f1e8..2dfe6b9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -37,6 +37,8 @@ import com.google.protobuf.RpcController;
*/
@InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable {
+ public static final int PRIORITY_UNSET = -1;
+
/**
* Priority to set on this request. Set it here in controller so available composing the
* request. This is the ordained way of setting priorities going forward. We will be
@@ -44,7 +46,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
*/
// Currently only multi call makes use of this. Eventually this should be only way to set
// priority.
- private int priority = 0;
+ private int priority = PRIORITY_UNSET;
// TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
@@ -71,6 +73,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
/**
* @return One-shot cell scanner (you cannot back it up and restart)
*/
+ @Override
public CellScanner cellScanner() {
return cellScanner;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 7f11038..82dd1d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -1048,7 +1048,9 @@ public class RpcClient {
builder.setCellBlockMeta(cellBlockBuilder.build());
}
// Only pass priority if there one. Let zero be same as no priority.
- if (priority != 0) builder.setPriority(priority);
+ if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
+ builder.setPriority(priority);
+ }
//noinspection SynchronizeOnNonFinalField
RequestHeader header = builder.build();
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index cee0ace..26c96e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2424,13 +2424,13 @@ public final class ProtobufUtil {
*/
public static String getRegionEncodedName(
final RegionSpecifier regionSpecifier) throws DoNotRetryIOException {
- byte[] value = regionSpecifier.getValue().toByteArray();
+ ByteString value = regionSpecifier.getValue();
RegionSpecifierType type = regionSpecifier.getType();
switch (type) {
case REGION_NAME:
- return HRegionInfo.encodeRegionName(value);
+ return HRegionInfo.encodeRegionName(value.toByteArray());
case ENCODED_REGION_NAME:
- return Bytes.toString(value);
+ return value.toStringUtf8();
default:
throw new DoNotRetryIOException(
"Unsupported region specifier type: " + type);
@@ -2905,6 +2905,19 @@ public final class ProtobufUtil {
codedInput.checkLastTagWas(0);
}
+ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length)
+ throws IOException {
+ codedInput.resetSizeCounter();
+ int prevLimit = codedInput.setSizeLimit(length);
+
+ int limit = codedInput.pushLimit(length);
+ builder.mergeFrom(codedInput);
+ codedInput.popLimit(limit);
+
+ codedInput.checkLastTagWas(0);
+ codedInput.setSizeLimit(prevLimit);
+ }
+
public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink cls) {
return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index 366d13c..35c06a1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -78,7 +78,7 @@ public class TestIPCUtil {
CellScanner cellScanner = sized? getSizedCellScanner(cells):
CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
- cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+ cellScanner = util.createCellScanner(codec, compressor, bb);
int i = 0;
while (cellScanner.advance()) {
i++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
new file mode 100644
index 0000000..8aee07b
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * Not thread safe!
+ * <p>
+ * Please note that the reads will cause position movement on wrapped ByteBuffer.
+ */
+@InterfaceAudience.Private
+public class ByteBufferInputStream extends InputStream {
+
+ private ByteBuffer buf;
+
+ public ByteBufferInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is returned as an
+ * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
+ * because the end of the stream has been reached, the value <code>-1</code> is returned.
+ * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
+ */
+ @Override
+ public int read() {
+ if (this.buf.hasRemaining()) {
+ return (this.buf.get() & 0xff);
+ }
+ return -1;
+ }
+
+ /**
+ * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
+ * given offset).
+ * @param b the array into which the data is read.
+ * @param off the start offset in the destination array <code>b</code>
+ * @param len the maximum number of bytes to read.
+ * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
+ * 1 byte can be read because the end of the stream has been reached.
+ */
+ @Override
+ public int read(byte[] b, int off, int len) {
+ int avail = available();
+ if (avail <= 0) {
+ return -1;
+ }
+
+ if (len > avail) {
+ len = avail;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+
+ ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len);
+ this.buf.position(this.buf.position() + len); // we should advance the buffer position
+ return len;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
+ * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
+ * equal to the smaller of <code>n</code> and remaining bytes in the stream.
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ @Override
+ public long skip(long n) {
+ long k = Math.min(n, available());
+ if (k < 0) {
+ k = 0;
+ }
+ this.buf.position((int) (this.buf.position() + k));
+ return k;
+ }
+
+ /**
+ * @return the number of remaining bytes that can be read (or skipped
+ * over) from this input stream.
+ */
+ @Override
+ public int available() {
+ return this.buf.remaining();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index b4c6690..bb8f766 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.util;
import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -33,6 +32,7 @@ import org.apache.hadoop.io.WritableUtils;
* Utility functions for working with byte buffers, such as reading/writing
* variable-length long numbers.
*/
+@SuppressWarnings("restriction")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class ByteBufferUtils {
@@ -41,6 +41,8 @@ public final class ByteBufferUtils {
private final static int VALUE_MASK = 0x7f;
private final static int NEXT_BIT_SHIFT = 7;
private final static int NEXT_BIT_MASK = 1 << 7;
+ private static final boolean UNSAFE_AVAIL = UnsafeAccess.isAvailable();
+ private static final boolean UNSAFE_UNALIGNED = UnsafeAccess.unaligned();
private ByteBufferUtils() {
}
@@ -331,7 +333,10 @@ public final class ByteBufferUtils {
}
/**
- * Copy from one buffer to another from given offset
+ * Copy from one buffer to another from given offset.
+ * <p>
+ * Note : This will advance the position marker of {@code out} but not change the position maker
+ * for {@code in}
* @param out destination buffer
* @param in source buffer
* @param sourceOffset offset in the source buffer
@@ -352,6 +357,27 @@ public final class ByteBufferUtils {
}
/**
+ * Copy from one buffer to another from given offset. This will be absolute positional copying and
+ * won't affect the position of any of the buffers.
+ * @param out
+ * @param in
+ * @param sourceOffset
+ * @param destinationOffset
+ * @param length
+ */
+ public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset,
+ int destinationOffset, int length) {
+ if (in.hasArray() && out.hasArray()) {
+ System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset()
+ + destinationOffset, length);
+ } else {
+ for (int i = 0; i < length; ++i) {
+ out.put((destinationOffset + i), in.get(sourceOffset + i));
+ }
+ }
+ }
+
+ /**
* Find length of common prefix of two parts in the buffer
* @param buffer Where parts are located.
* @param offsetLeft Offset of the first part.
@@ -454,4 +480,59 @@ public final class ByteBufferUtils {
return output;
}
+ /**
+ * Copy the given number of bytes from specified offset into a new byte[]
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return a new byte[] containing the bytes in the specified range
+ */
+ public static byte[] toBytes(ByteBuffer buffer, int offset, int length) {
+ byte[] output = new byte[length];
+ for (int i = 0; i < length; i++) {
+ output[i] = buffer.get(offset + i);
+ }
+ return output;
+ }
+
+ public static int compareTo(ByteBuffer buf1, int o1, int len1, ByteBuffer buf2, int o2,
+ int len2) {
+ if (buf1.hasArray() && buf2.hasArray()) {
+ return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
+ buf2.arrayOffset() + o2, len2);
+ }
+ int end1 = o1 + len1;
+ int end2 = o2 + len2;
+ for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+ int a = buf1.get(i) & 0xFF;
+ int b = buf2.get(j) & 0xFF;
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return len1 - len2;
+ }
+
+ /**
+ * Copies specified number of bytes from given offset of 'in' ByteBuffer to
+ * the array.
+ * @param out
+ * @param in
+ * @param sourceOffset
+ * @param destinationOffset
+ * @param length
+ */
+ public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset,
+ int destinationOffset, int length) {
+ if (in.hasArray()) {
+ System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length);
+ } else if (UNSAFE_AVAIL) {
+ UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
+ } else {
+ int oldPos = in.position();
+ in.position(sourceOffset);
+ in.get(out, destinationOffset, length);
+ in.position(oldPos);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 2e86b4d..fabdce8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.nio.ByteOrder;
+import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -29,19 +29,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
@InterfaceAudience.Private
@InterfaceStability.Evolving
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
+ justification="If exception, presume unaligned")
public final class UnsafeAccess {
private static final Log LOG = LogFactory.getLog(UnsafeAccess.class);
public static final Unsafe theUnsafe;
+ private static boolean unaligned = false;
/** The offset to the first element in a byte array. */
public static final int BYTE_ARRAY_BASE_OFFSET;
- private static boolean unaligned = false;
+ // This number limits the number of bytes to copy per call to Unsafe's
+ // copyMemory method. A limit is imposed to allow for safepoint polling
+ // during a large copy
+ static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
static {
theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
@@ -66,7 +73,7 @@ public final class UnsafeAccess {
m.setAccessible(true);
unaligned = (Boolean) m.invoke(null);
} catch (Exception e) {
- unaligned = false;
+ unaligned = false; // FindBugs: Causes REC_CATCH_EXCEPTION. Suppressed.
}
} else{
BYTE_ARRAY_BASE_OFFSET = -1;
@@ -87,6 +94,89 @@ public final class UnsafeAccess {
return unaligned;
}
- public static final boolean littleEndian = ByteOrder.nativeOrder()
- .equals(ByteOrder.LITTLE_ENDIAN);
+
+ // APIs to copy data. This will be direct memory location copy and will be much faster
+ /**
+ * Copies the bytes from given array's offset to length part into the given buffer.
+ * @param src
+ * @param srcOffset
+ * @param dest
+ * @param destOffset
+ * @param length
+ */
+ public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) {
+ long destAddress = destOffset;
+ Object destBase = null;
+ if (dest.isDirect()) {
+ destAddress = destAddress + ((DirectBuffer) dest).address();
+ } else {
+ destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+ destBase = dest.array();
+ }
+ long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET;
+ unsafeCopy(src, srcAddress, destBase, destAddress, length);
+ }
+
+ private static void unsafeCopy(Object src, long srcAddr, Object dst, long destAddr, long len) {
+ while (len > 0) {
+ long size = (len > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : len;
+ theUnsafe.copyMemory(src, srcAddr, dst, destAddr, len);
+ len -= size;
+ srcAddr += size;
+ destAddr += size;
+ }
+ }
+
+ /**
+ * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the
+ * {@code dest} array.
+ *
+ * @param src
+ * @param srcOffset
+ * @param dest
+ * @param destOffset
+ * @param length
+ */
+ public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset,
+ int length) {
+ long srcAddress = srcOffset;
+ Object srcBase = null;
+ if (src.isDirect()) {
+ srcAddress = srcAddress + ((DirectBuffer) src).address();
+ } else {
+ srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset();
+ srcBase = src.array();
+ }
+ long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET;
+ unsafeCopy(srcBase, srcAddress, dest, destAddress, length);
+ }
+
+ /**
+ * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest}
+ * buffer.
+ *
+ * @param src
+ * @param srcOffset
+ * @param dest
+ * @param destOffset
+ * @param length
+ */
+ public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset,
+ int length) {
+ long srcAddress, destAddress;
+ Object srcBase = null, destBase = null;
+ if (src.isDirect()) {
+ srcAddress = srcOffset + ((DirectBuffer) src).address();
+ } else {
+ srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET;
+ srcBase = src.array();
+ }
+ if (dest.isDirect()) {
+ destAddress = destOffset + ((DirectBuffer) dest).address();
+ } else {
+ destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+ destBase = dest.array();
+ }
+ unsafeCopy(srcBase, srcAddress, destBase, destAddress, length);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 76b426b..49eb883 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -1366,17 +1367,18 @@ public class RpcServer implements RpcServerInterface {
return authorizedUgi;
}
- private void saslReadAndProcess(byte[] saslToken) throws IOException,
+ private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (LOG.isDebugEnabled())
- LOG.debug("Have read input token of size " + saslToken.length
+ LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
- byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
+ byte[] b = saslToken.array();
+ byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
processUnwrappedData(plaintextData);
}
} else {
@@ -1426,10 +1428,10 @@ public class RpcServer implements RpcServerInterface {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Have read input token of size " + saslToken.length
+ LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()");
}
- replyToken = saslServer.evaluateResponse(saslToken);
+ replyToken = saslServer.evaluateResponse(saslToken.array());
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1605,6 +1607,7 @@ public class RpcServer implements RpcServerInterface {
throw new IllegalArgumentException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
+ // TODO: check dataLength against some limit so that the client cannot OOM the server
data = ByteBuffer.allocate(dataLength);
incRpcCount(); // Increment the rpc count
}
@@ -1621,9 +1624,9 @@ public class RpcServer implements RpcServerInterface {
}
boolean headerRead = connectionHeaderRead;
if (useSasl) {
- saslReadAndProcess(data.array());
+ saslReadAndProcess(data);
} else {
- processOneRpc(data.array());
+ processOneRpc(data);
}
this.data = null;
if (!headerRead) {
@@ -1658,8 +1661,8 @@ public class RpcServer implements RpcServerInterface {
}
// Reads the connection header following version
- private void processConnectionHeader(byte[] buf) throws IOException {
- this.connectionHeader = ConnectionHeader.parseFrom(buf);
+ private void processConnectionHeader(ByteBuffer buf) throws IOException {
+ this.connectionHeader = ConnectionHeader.parseFrom(new ByteBufferInputStream(buf));
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException();
this.service = getService(services, serviceName);
@@ -1769,13 +1772,13 @@ public class RpcServer implements RpcServerInterface {
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
- processOneRpc(unwrappedData.array());
+ processOneRpc(unwrappedData);
unwrappedData = null;
}
}
}
- private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
+ private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
} else {
@@ -1797,16 +1800,16 @@ public class RpcServer implements RpcServerInterface {
* @throws IOException
* @throws InterruptedException
*/
- protected void processRequest(byte[] buf) throws IOException, InterruptedException {
- long totalRequestSize = buf.length;
+ protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ long totalRequestSize = buf.limit();
int offset = 0;
// Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use backing array.
- CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
+ CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder();
- ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
+ ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
offset += headerSize;
int id = header.getCallId();
@@ -1838,18 +1841,18 @@ public class RpcServer implements RpcServerInterface {
if (md == null) throw new UnsupportedOperationException(header.getMethodName());
builder = this.service.getRequestPrototype(md).newBuilderForType();
// To read the varint, I need an inputstream; might as well be a CIS.
- cis = CodedInputStream.newInstance(buf, offset, buf.length);
+ cis.resetSizeCounter();
int paramSize = cis.readRawVarint32();
offset += cis.getTotalBytesRead();
if (builder != null) {
- ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
+ ProtobufUtil.mergeFrom(builder, cis, paramSize);
param = builder.build();
}
offset += paramSize;
}
if (header.hasCellBlockMeta()) {
- cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
- buf, offset, buf.length);
+ buf.position(offset);
+ cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
index 16e49f0..dda4240 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -156,10 +155,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
if (param == null) {
return HConstants.NORMAL_QOS;
}
- if (param instanceof MultiRequest) {
- // The multi call has its priority set in the header. All calls should work this way but
- // only this one has been converted so far. No priority == NORMAL_QOS.
- return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
+ // Trust the client-set priorities if set
+ if (header.hasPriority()) {
+ return header.getPriority();
}
String cls = param.getClass().getName();
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index fb18d84..8674308 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4065,8 +4065,9 @@ public class HRegion implements HeapSize { // , Writable{
// Here we separate all scanners into two lists - scanner that provide data required
// by the filter to operate (scanners list) and all others (joinedScanners list).
- List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
- List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
+ List<KeyValueScanner> joinedScanners =
+ new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8c09416..439bee7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -4492,8 +4492,19 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
*/
protected HRegion getRegion(
final RegionSpecifier regionSpecifier) throws IOException {
- return getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
- ProtobufUtil.getRegionEncodedName(regionSpecifier));
+ ByteString value = regionSpecifier.getValue();
+ RegionSpecifierType type = regionSpecifier.getType();
+ switch (type) {
+ case REGION_NAME:
+ byte[] regionName = value.toByteArray();
+ String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+ return getRegionByEncodedName(regionName, encodedRegionName);
+ case ENCODED_REGION_NAME:
+ return getRegionByEncodedName(value.toStringUtf8());
+ default:
+ throw new DoNotRetryIOException(
+ "Unsupported region specifier type: " + type);
+ }
}
/**