You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/06/23 22:25:48 UTC

[hbase] branch branch-2 updated: HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new da76941b0eb HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
da76941b0eb is described below

commit da76941b0eb3f7ee602164cfd14ad77a4d5e829a
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Thu Jun 23 15:13:29 2022 -0700

    HBASE-27111 Make Netty channel bytebuf allocator configurable. (#4525)
    
    Support site configuration of the bytebuf allocator that Netty will use for
    NettyRpcServer channels. Property name is 'hbase.netty.rpcserver.allocator'.
    Default is no value, which is equivalent to "pooled". Valid values are:
    - "pooled": use PooledByteBufAllocator
    - "unpooled": use UnpooledByteBufAllocator
    - "heap": use HeapByteBufAllocator, which is a PooledByteBufAllocator that
       preferentially allocates buffers on heap wherever possible
    - <class>: If the value is none of the recognized labels, treat it as a class
      name implementing org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator.
      This allows the user to add a custom implementation, perhaps for debugging.
    
    Also updates ReflectionUtils with a new helper method.
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    
    Conflicts:
            hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
---
 .../apache/hadoop/hbase/util/ReflectionUtils.java  | 25 ++++++++++
 .../hadoop/hbase/ipc/HeapByteBufAllocator.java     | 56 ++++++++++++++++++++++
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    | 53 +++++++++++++++++++-
 .../hadoop/hbase/ipc/SimpleByteBufAllocator.java   | 52 ++++++++++++++++++++
 .../hadoop/hbase/ipc/TestNettyRpcServer.java       | 29 +++++++----
 5 files changed, 205 insertions(+), 10 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index e0fa06fdd6c..6b8ef990f94 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -62,6 +62,31 @@ public class ReflectionUtils {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(String className, Object... params) {
+    Class<T> type;
+    try {
+      type = (Class<T>) getClassLoader().loadClass(className);
+    } catch (ClassNotFoundException | ClassCastException e) {
+      throw new UnsupportedOperationException("Unable to load specified class " + className, e);
+    }
+    return instantiate(type.getName(), findConstructor(type, params), params);
+  }
+
+  public static ClassLoader getClassLoader() {
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    if (cl == null) {
+      cl = ReflectionUtils.class.getClassLoader();
+    }
+    if (cl == null) {
+      cl = ClassLoader.getSystemClassLoader();
+    }
+    if (cl == null) {
+      throw new RuntimeException("A ClassLoader could not be found");
+    }
+    return cl;
+  }
+
   public static <T> T newInstance(Class<T> type, Object... params) {
     return instantiate(type.getName(), findConstructor(type, params), params);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java
new file mode 100644
index 00000000000..fd5ada50955
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * A pooled ByteBufAllocator that does not prefer direct buffers regardless of platform settings.
+ * <p>
+ * In some cases direct buffers are still required, like IO buffers where the buffer will be used in
+ * conjunction with a native method call, so we cannot force all buffer usage on heap. But we can
+ * strongly prefer it.
+ */
+@InterfaceAudience.Private
+public class HeapByteBufAllocator extends AbstractByteBufAllocator {
+
+  public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator();
+
+  private final PooledByteBufAllocator delegate =
+    new PooledByteBufAllocator(false /* preferDirect */);
+
+  @Override
+  public boolean isDirectBufferPooled() {
+    return delegate.isDirectBufferPooled();
+  }
+
+  @Override
+  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+    return delegate.heapBuffer(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+    return delegate.directBuffer(initialCapacity, maxCapacity);
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index e5ca624002a..3112eb5d472 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -41,6 +42,9 @@ import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
@@ -71,18 +75,35 @@ public class NettyRpcServer extends RpcServer {
     "hbase.netty.eventloop.rpcserver.thread.count";
   private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
 
+  /**
+   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
+   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
+   * and "heap", or, the name of a class implementing ByteBufAllocator.
+   * <p>
+   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
+   * controlled by platform specific code and documented system properties.
+   * <p>
+   * "heap" will prefer heap arena allocations.
+   */
+  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
+  static final String POOLED_ALLOCATOR_TYPE = "pooled";
+  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
+  static final String HEAP_ALLOCATOR_TYPE = "heap";
+
   private final InetSocketAddress bindAddress;
 
   private final CountDownLatch closed = new CountDownLatch(1);
   private final Channel serverChannel;
   private final ChannelGroup allChannels =
     new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
+  private final ByteBufAllocator channelAllocator;
 
   public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
     InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
     boolean reservoirEnabled) throws IOException {
     super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
     this.bindAddress = bindAddress;
+    this.channelAllocator = getChannelAllocator(conf);
     EventLoopGroup eventLoopGroup;
     Class<? extends ServerChannel> channelClass;
     if (server instanceof HRegionServer) {
@@ -103,9 +124,9 @@ public class NettyRpcServer extends RpcServer {
       .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
       .childOption(ChannelOption.SO_REUSEADDR, true)
       .childHandler(new ChannelInitializer<Channel>() {
-
         @Override
         protected void initChannel(Channel ch) throws Exception {
+          ch.config().setAllocator(channelAllocator);
           ChannelPipeline pipeline = ch.pipeline();
           FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
           preambleDecoder.setSingleDecode(true);
@@ -126,6 +147,36 @@ public class NettyRpcServer extends RpcServer {
     this.scheduler.init(new RpcSchedulerContext(this));
   }
 
+  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
+    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
+    if (value != null) {
+      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
+        return PooledByteBufAllocator.DEFAULT;
+      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
+        return UnpooledByteBufAllocator.DEFAULT;
+      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
+        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
+        return HeapByteBufAllocator.DEFAULT;
+      } else {
+        // If the value is none of the recognized labels, treat it as a class name. This allows the
+        // user to supply a custom implementation, perhaps for debugging.
+        try {
+          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
+          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
+          LOG.info("Using {} for buffer allocation", value);
+          return alloc;
+        } catch (ClassCastException | UnsupportedOperationException e) {
+          throw new IOException(e);
+        }
+      }
+    } else {
+      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
+      return PooledByteBufAllocator.DEFAULT;
+    }
+  }
+
   @InterfaceAudience.Private
   protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
     return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java
new file mode 100644
index 00000000000..1796539eb0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/SimpleByteBufAllocator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+
+/**
+ * A custom byte buf allocator for TestNettyRpcServer.
+ */
+public class SimpleByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocator {
+
+  static final Logger LOG = LoggerFactory.getLogger(SimpleByteBufAllocator.class);
+
+  @Override
+  public boolean isDirectBufferPooled() {
+    return false;
+  }
+
+  @Override
+  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+    LOG.info("newHeapBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
+    return Unpooled.buffer(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+    LOG.info("newDirectBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
+    return Unpooled.directBuffer(initialCapacity, maxCapacity);
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
index 7d490e343ff..8d58b3a5722 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,16 +35,19 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 @Category({ RPCTests.class, MediumTests.class })
+@RunWith(Parameterized.class)
 public class TestNettyRpcServer {
 
   @ClassRule
@@ -57,22 +62,28 @@ public class TestNettyRpcServer {
   private static byte[] FAMILY = Bytes.toBytes("f1");
   private static byte[] PRIVATE_COL = Bytes.toBytes("private");
   private static byte[] PUBLIC_COL = Bytes.toBytes("public");
+  @Parameterized.Parameter
+  public String allocatorType;
 
-  @Before
-  public void setup() {
-    TABLE = TableName.valueOf(name.getMethodName());
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE },
+      { NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE },
+      { SimpleByteBufAllocator.class.getName() } });
   }
 
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
+  @Before
+  public void setup() throws Exception {
+    TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_'));
     TEST_UTIL = new HBaseTestingUtility();
     TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
       NettyRpcServer.class.getName());
+    TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType);
     TEST_UTIL.startMiniCluster();
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
+  @After
+  public void tearDown() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }