You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/02/19 01:22:22 UTC

[hbase] branch master updated: HBASE-26703 Allow configuration of IPC queue balancer (#4063)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb3d46  HBASE-26703 Allow configuration of IPC queue balancer (#4063)
8fb3d46 is described below

commit 8fb3d4666a8cc503b86504d8ccd6a2e34aa4050d
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Fri Feb 18 20:21:40 2022 -0500

    HBASE-26703 Allow configuration of IPC queue balancer (#4063)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java |  15 ++-
 .../org/apache/hadoop/hbase/ipc/QueueBalancer.java |  35 +++++++
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java       |  49 +++++++--
 .../hadoop/hbase/ipc/RandomQueueBalancer.java      |  54 ++++++++++
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   |  53 +++-------
 .../hadoop/hbase/ipc/TestRWQueueRpcExecutor.java   | 109 +++++++++++++++++++++
 6 files changed, 265 insertions(+), 50 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index db8e93f..ad1747b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -18,10 +18,10 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.util.concurrent.BlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -47,13 +47,13 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
       final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
       final Configuration conf, final Abortable abortable) {
     super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
-    this.balancer = getBalancer(this.numCallQueues);
     initializeQueues(this.numCallQueues);
+    this.balancer = getBalancer(name, conf, getQueues());
   }
 
   @Override
   public boolean dispatch(final CallRunner callTask) throws InterruptedException {
-    int queueIndex = balancer.getNextQueue();
+    int queueIndex = balancer.getNextQueue(callTask);
     BlockingQueue<CallRunner> queue = queues.get(queueIndex);
     // that means we can overflow by at most <num reader> size (5), that's ok
     if (queue.size() >= currentQueueLimit) {
@@ -61,4 +61,13 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
     }
     return queue.offer(callTask);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    super.onConfigurationChange(conf);
+
+    if (balancer instanceof ConfigurationObserver) {
+      ((ConfigurationObserver) balancer).onConfigurationChange(conf);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java
new file mode 100644
index 0000000..d1141d0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java
@@ -0,0 +1,35 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Interface for balancing requests across IPC queues
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Stable
+public interface QueueBalancer {
+  /**
+   * @return the index of the next queue to which a request should be inserted
+   */
+  int getNextQueue(CallRunner callRunner);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 43c6ce4..b2df225 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -22,22 +22,22 @@ package org.apache.hadoop.hbase.ipc;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 
 /**
  * RPC Executor that uses different queues for reads and writes.
@@ -97,14 +97,17 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     numScanQueues = scanQueues;
     scanHandlersCount = scanHandlers;
 
-    this.writeBalancer = getBalancer(numWriteQueues);
-    this.readBalancer = getBalancer(numReadQueues);
-    this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
-
     initializeQueues(numWriteQueues);
     initializeQueues(numReadQueues);
     initializeQueues(numScanQueues);
 
+    this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
+    this.readBalancer = getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
+    this.scanBalancer = numScanQueues > 0 ?
+      getBalancer(name, conf, queues.subList(numWriteQueues + numReadQueues,
+        numWriteQueues + numReadQueues + numScanQueues)) :
+      null;
+
     LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
       + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
       + numScanQueues + " scanHandlers=" + scanHandlersCount);
@@ -139,11 +142,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     final CallRunner callTask) {
     int queueIndex;
     if (toWriteQueue) {
-      queueIndex = writeBalancer.getNextQueue();
+      queueIndex = writeBalancer.getNextQueue(callTask);
     } else if (toScanQueue) {
-      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
+      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
     } else {
-      queueIndex = numWriteQueues + readBalancer.getNextQueue();
+      queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
     }
     Queue<CallRunner> queue = queues.get(queueIndex);
     if (queue.size() >= currentQueueLimit) {
@@ -234,6 +237,18 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     return false;
   }
 
+  QueueBalancer getWriteBalancer() {
+    return writeBalancer;
+  }
+
+  QueueBalancer getReadBalancer() {
+    return readBalancer;
+  }
+
+  QueueBalancer getScanBalancer() {
+    return scanBalancer;
+  }
+
   private boolean isScanRequest(final RequestHeader header, final Message param) {
     return param instanceof ScanRequest;
   }
@@ -266,4 +281,18 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private static int calcNumReaders(final int count, final float readShare) {
     return count - calcNumWriters(count, readShare);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    super.onConfigurationChange(conf);
+    propagateBalancerConfigChange(writeBalancer, conf);
+    propagateBalancerConfigChange(readBalancer, conf);
+    propagateBalancerConfigChange(scanBalancer, conf);
+  }
+
+  private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
+    if (balancer instanceof ConfigurationObserver) {
+      ((ConfigurationObserver) balancer).onConfigurationChange(conf);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java
new file mode 100644
index 0000000..528affc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java
@@ -0,0 +1,54 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Queue balancer that just randomly selects a queue in the range [0, num queues).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class RandomQueueBalancer implements QueueBalancer {
+  private final int queueSize;
+  private final List<BlockingQueue<CallRunner>> queues;
+
+  public RandomQueueBalancer(Configuration conf, String executorName, List<BlockingQueue<CallRunner>> queues) {
+    this.queueSize = queues.size();
+    this.queues = queues;
+  }
+
+  @Override
+  public int getNextQueue(CallRunner callRunner) {
+    return ThreadLocalRandom.current().nextInt(queueSize);
+  }
+
+  /**
+   * Exposed for use in tests
+   */
+  List<BlockingQueue<CallRunner>> getQueues() {
+    return queues;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 610594a..1ef78af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -20,32 +20,29 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.Map;
-import java.util.HashMap;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
 
 /**
  * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
@@ -73,6 +70,10 @@ public abstract class RpcExecutor {
   public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
   public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
 
+  public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
+  public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
+
+
   // These 3 are only used by Codel executor
   public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
   public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
@@ -297,19 +298,13 @@ public abstract class RpcExecutor {
         handlers.size(), threadPrefix, qsize, port);
   }
 
-  public static abstract class QueueBalancer {
-    /**
-     * @return the index of the next queue to which a request should be inserted
-     */
-    public abstract int getNextQueue();
-  }
-
-  public static QueueBalancer getBalancer(int queueSize) {
-    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
-    if (queueSize == 1) {
+  public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
+    Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
+    if (queues.size() == 1) {
       return ONE_QUEUE;
     } else {
-      return new RandomQueueBalancer(queueSize);
+      Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
+      return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
     }
   }
 
@@ -318,28 +313,12 @@ public abstract class RpcExecutor {
    */
   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
     @Override
-    public int getNextQueue() {
+    public int getNextQueue(CallRunner callRunner) {
       return 0;
     }
   };
 
   /**
-   * Queue balancer that just randomly selects a queue in the range [0, num queues).
-   */
-  private static class RandomQueueBalancer extends QueueBalancer {
-    private final int queueSize;
-
-    public RandomQueueBalancer(int queueSize) {
-      this.queueSize = queueSize;
-    }
-
-    @Override
-    public int getNextQueue() {
-      return ThreadLocalRandom.current().nextInt(queueSize);
-    }
-  }
-
-  /**
    * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
    * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
    * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
new file mode 100644
index 0000000..ae4fc41
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java
@@ -0,0 +1,109 @@
+/**
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY;
+import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY;
+import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RPCTests.class, MediumTests.class})
+public class TestRWQueueRpcExecutor {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRWQueueRpcExecutor.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+    conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
+    conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
+  }
+
+  @Test
+  public void itProvidesCorrectQueuesToBalancers() throws InterruptedException {
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    RWQueueRpcExecutor executor =
+      new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, conf, null);
+
+    QueueBalancer readBalancer = executor.getReadBalancer();
+    QueueBalancer writeBalancer = executor.getWriteBalancer();
+    QueueBalancer scanBalancer = executor.getScanBalancer();
+
+    assertTrue(readBalancer instanceof RandomQueueBalancer);
+    assertTrue(writeBalancer instanceof RandomQueueBalancer);
+    assertTrue(scanBalancer instanceof RandomQueueBalancer);
+
+    List<BlockingQueue<CallRunner>> readQueues = ((RandomQueueBalancer) readBalancer).getQueues();
+    List<BlockingQueue<CallRunner>> writeQueues = ((RandomQueueBalancer) writeBalancer).getQueues();
+    List<BlockingQueue<CallRunner>> scanQueues = ((RandomQueueBalancer) scanBalancer).getQueues();
+
+    assertEquals(25, readQueues.size());
+    assertEquals(50, writeQueues.size());
+    assertEquals(25, scanQueues.size());
+
+    verifyDistinct(readQueues, writeQueues, scanQueues);
+    verifyDistinct(writeQueues, readQueues, scanQueues);
+    verifyDistinct(scanQueues, readQueues, writeQueues);
+
+  }
+
+  private void verifyDistinct(List<BlockingQueue<CallRunner>> queues, List<BlockingQueue<CallRunner>>... others)
+    throws InterruptedException {
+    CallRunner mock = mock(CallRunner.class);
+    for (BlockingQueue<CallRunner> queue : queues) {
+      queue.put(mock);
+      assertEquals(1, queue.size());
+    }
+
+    for (List<BlockingQueue<CallRunner>> other : others) {
+      for (BlockingQueue<CallRunner> queue : other) {
+        assertEquals(0, queue.size());
+      }
+    }
+
+    // clear them for next test
+    for (BlockingQueue<CallRunner> queue : queues) {
+      queue.clear();
+    }
+  }
+}