You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/06/23 22:13:36 UTC
[hbase] branch master updated: HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 4f88a3c4bb6 HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
4f88a3c4bb6 is described below
commit 4f88a3c4bb6181192b1fdfc0bca02835585245ef
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Thu Jun 23 15:13:29 2022 -0700
HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
Support site configuration of the bytebuf allocator that Netty will use for
NettyRpcServer channels. Property name is 'hbase.netty.rpcserver.allocator'.
Default is no value, which is equivalent to "pooled". Valid values are:
- "pooled": use PooledByteBufAllocator
- "unpooled": use UnpooledByteBufAllocator
- "heap": use HeapByteBufAllocator, which is a PooledByteBufAllocator that
preferentially allocates buffers on heap wherever possible
- <class>: If the value is none of the recognized labels, treat it as a class
name implementing org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator.
This allows the user to add a custom implementation, perhaps for debugging.
Also updates ReflectionUtils with a new helper method.
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../apache/hadoop/hbase/util/ReflectionUtils.java | 25 ++++++++++
.../hadoop/hbase/ipc/HeapByteBufAllocator.java | 56 ++++++++++++++++++++++
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 53 +++++++++++++++++++-
.../hadoop/hbase/ipc/SimpleByteBufAllocator.java | 52 ++++++++++++++++++++
.../hadoop/hbase/ipc/TestNettyRpcServer.java | 29 +++++++----
5 files changed, 205 insertions(+), 10 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 5f6500fb2a5..547d28cfa88 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -63,6 +63,31 @@ public class ReflectionUtils {
}
}
+ @SuppressWarnings("unchecked")
+ public static <T> T newInstance(String className, Object... params) {
+ Class<T> type;
+ try {
+ type = (Class<T>) getClassLoader().loadClass(className);
+ } catch (ClassNotFoundException | ClassCastException e) {
+ throw new UnsupportedOperationException("Unable to load specified class " + className, e);
+ }
+ return instantiate(type.getName(), findConstructor(type, params), params);
+ }
+
+ public static ClassLoader getClassLoader() {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ cl = ReflectionUtils.class.getClassLoader();
+ }
+ if (cl == null) {
+ cl = ClassLoader.getSystemClassLoader();
+ }
+ if (cl == null) {
+ throw new RuntimeException("A ClassLoader could not be found");
+ }
+ return cl;
+ }
+
public static <T> T newInstance(Class<T> type, Object... params) {
return instantiate(type.getName(), findConstructor(type, params), params);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java
new file mode 100644
index 00000000000..fd5ada50955
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * A pooled ByteBufAllocator that does not prefer direct buffers regardless of platform settings.
+ * <p>
+ * In some cases direct buffers are still required, like IO buffers where the buffer will be used in
+ * conjunction with a native method call, so we cannot force all buffer usage on heap. But we can
+ * strongly prefer it.
+ */
+@InterfaceAudience.Private
+public class HeapByteBufAllocator extends AbstractByteBufAllocator {
+
+ public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator();
+
+ private final PooledByteBufAllocator delegate =
+ new PooledByteBufAllocator(false /* preferDirect */);
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return delegate.isDirectBufferPooled();
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ return delegate.heapBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ return delegate.directBuffer(initialCapacity, maxCapacity);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 9c8319944e7..e2578ec1575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -29,12 +29,16 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
@@ -65,18 +69,35 @@ public class NettyRpcServer extends RpcServer {
"hbase.netty.eventloop.rpcserver.thread.count";
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
+ /**
+ * Name of property to change the byte buf allocator for the netty channels. Default is no value,
+ * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
+ * and "heap", or, the name of a class implementing ByteBufAllocator.
+ * <p>
+ * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
+ * controlled by platform specific code and documented system properties.
+ * <p>
+ * "heap" will prefer heap arena allocations.
+ */
+ public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
+ static final String POOLED_ALLOCATOR_TYPE = "pooled";
+ static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
+ static final String HEAP_ALLOCATOR_TYPE = "heap";
+
private final InetSocketAddress bindAddress;
private final CountDownLatch closed = new CountDownLatch(1);
private final Channel serverChannel;
private final ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
+ private final ByteBufAllocator channelAllocator;
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.bindAddress = bindAddress;
+ this.channelAllocator = getChannelAllocator(conf);
EventLoopGroup eventLoopGroup;
Class<? extends ServerChannel> channelClass;
if (server instanceof HRegionServer) {
@@ -97,9 +118,9 @@ public class NettyRpcServer extends RpcServer {
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<Channel>() {
-
@Override
protected void initChannel(Channel ch) throws Exception {
+ ch.config().setAllocator(channelAllocator);
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
@@ -120,6 +141,36 @@ public class NettyRpcServer extends RpcServer {
this.scheduler.init(new RpcSchedulerContext(this));
}
+ private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
+ final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
+ if (value != null) {
+ if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+ LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
+ return PooledByteBufAllocator.DEFAULT;
+ } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+ LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
+ return UnpooledByteBufAllocator.DEFAULT;
+ } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+ LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
+ return HeapByteBufAllocator.DEFAULT;
+ } else {
+ // If the value is none of the recognized labels, treat it as a class name. This allows the
+ // user to supply a custom implementation, perhaps for debugging.
+ try {
+ // ReflectionUtils throws UnsupportedOperationException if there are any problems.
+ ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
+ LOG.info("Using {} for buffer allocation", value);
+ return alloc;
+ } catch (ClassCastException | UnsupportedOperationException e) {
+ throw new IOException(e);
+ }
+ }
+ } else {
+ LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
+ return PooledByteBufAllocator.DEFAULT;
+ }
+ }
+
@InterfaceAudience.Private
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java
new file mode 100644
index 00000000000..1796539eb0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+
+/**
+ * A custom byte buf allocator for TestNettyRpcServer.
+ */
+public class SimpleByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocator {
+
+ static final Logger LOG = LoggerFactory.getLogger(SimpleByteBufAllocator.class);
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return false;
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ LOG.info("newHeapBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
+ return Unpooled.buffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ LOG.info("newDirectBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
+ return Unpooled.directBuffer(initialCapacity, maxCapacity);
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
index b1db8778146..eefadaf528c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -33,16 +35,19 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
@Category({ RPCTests.class, MediumTests.class })
+@RunWith(Parameterized.class)
public class TestNettyRpcServer {
@ClassRule
@@ -57,22 +62,28 @@ public class TestNettyRpcServer {
private static byte[] FAMILY = Bytes.toBytes("f1");
private static byte[] PRIVATE_COL = Bytes.toBytes("private");
private static byte[] PUBLIC_COL = Bytes.toBytes("public");
+ @Parameterized.Parameter
+ public String allocatorType;
- @Before
- public void setup() {
- TABLE = TableName.valueOf(name.getMethodName());
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE },
+ { NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE },
+ { SimpleByteBufAllocator.class.getName() } });
}
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
+ @Before
+ public void setup() throws Exception {
+ TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_'));
TEST_UTIL = new HBaseTestingUtil();
TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
NettyRpcServer.class.getName());
+ TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType);
TEST_UTIL.startMiniCluster();
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
+ @After
+ public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}