You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/07/13 07:40:44 UTC

[hbase] branch branch-1 updated: HBASE-23744 - FastPathBalancedQueueRpcExecutor should enforce queue length of 0

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new be151e3  HBASE-23744 - FastPathBalancedQueueRpcExecutor should enforce queue length of 0
be151e3 is described below

commit be151e3fceca942d80c43f4f9f7bfb79f2b18f58
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Mon Jul 13 12:08:59 2020 +0530

    HBASE-23744 - FastPathBalancedQueueRpcExecutor should enforce queue length of 0
    
    Closes #1094
    
    Co-authored-by: Viraj Jasani <vj...@apache.org>
    
    Signed-off-by: Xu Cang <xu...@apache.org>
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../ipc/FastPathBalancedQueueRpcExecutor.java      |  5 ++++
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   | 27 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
index 1922f90..724d828 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -65,6 +65,11 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
 
   @Override
   public boolean dispatch(CallRunner callTask) throws InterruptedException {
+    //FastPathHandlers don't check queue limits, so if we're completely shut down
+    //we have to prevent ourselves from using the handler in the first place
+    if (currentQueueLimit == 0){
+      return false;
+    }
     FastPathHandler handler = popReadyHandler();
     return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 940f1c2..d18e167 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -39,10 +39,12 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
@@ -59,8 +61,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -74,6 +79,9 @@ import com.google.protobuf.Message;
 public class TestSimpleRpcScheduler {
   private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
 
+  @Rule
+  public TestName testName = new TestName();
+
   private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
     @Override
     public InetSocketAddress getListenerAddress() {
@@ -475,6 +483,25 @@ public class TestSimpleRpcScheduler {
     }
   }
 
+  @Test
+  public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
+    String name = testName.getMethodName();
+    int handlerCount = 1;
+    String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
+    int maxQueueLength = 0;
+    PriorityFunction priority = mock(PriorityFunction.class);
+    Configuration conf = HBaseConfiguration.create();
+    Abortable abortable = mock(Abortable.class);
+    FastPathBalancedQueueRpcExecutor executor =
+      Mockito.spy(new FastPathBalancedQueueRpcExecutor(name,
+      handlerCount, callQueueType, maxQueueLength, priority, conf, abortable));
+    CallRunner task = mock(CallRunner.class);
+    assertFalse(executor.dispatch(task));
+    //make sure we never internally get a handler, which would skip the queue validation
+    Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(),
+      (BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any());
+  }
+
   // Get mocked call that has the CallRunner sleep for a while so that the fast
   // path isn't hit.
   private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {