You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/02/25 05:41:52 UTC

hbase git commit: HBASE-15136 Explore different queuing behaviors while busy

Repository: hbase
Updated Branches:
  refs/heads/master 6e9d355b1 -> 43f99def6


HBASE-15136 Explore different queuing behaviors while busy


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/43f99def
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/43f99def
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/43f99def

Branch: refs/heads/master
Commit: 43f99def670551cfe314c44181c0cb9570cdaaa3
Parents: 6e9d355
Author: Mikhail Antonov <an...@apache.org>
Authored: Wed Feb 24 20:40:44 2016 -0800
Committer: Mikhail Antonov <an...@apache.org>
Committed: Wed Feb 24 20:41:30 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/util/ReflectionUtils.java      |   1 +
 .../hbase/ipc/MetricsHBaseServerSource.java     |   6 +
 .../hbase/ipc/MetricsHBaseServerWrapper.java    |   2 +
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java |   6 +-
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   | 329 +++++++++++++++++++
 .../hadoop/hbase/ipc/FifoRpcScheduler.java      |  10 +
 .../ipc/MetricsHBaseServerWrapperImpl.java      |  16 +
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |  10 +
 .../apache/hadoop/hbase/ipc/RpcScheduler.java   |  13 +
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  70 +++-
 .../ipc/MetricsHBaseServerWrapperStub.java      |  10 +
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  63 ++++
 12 files changed, 534 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 650c544..15b3930 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -85,6 +85,7 @@ public class ReflectionUtils {
         match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
                   ((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
                    (long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+                   (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
                    (char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
                    (short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
                    (boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 061a672..bb89789 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -64,6 +64,12 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
   String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
   String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
+  String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
+  String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
+    "were dropped by CoDel RPC executor";
+  String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
+  String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
+    "were served from the tail of the queue";
 
   String EXCEPTIONS_NAME="exceptions";
   String EXCEPTIONS_DESC="Exceptions caused by requests";

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 1885264..8f30205 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -26,4 +26,6 @@ public interface MetricsHBaseServerWrapper {
   int getPriorityQueueLength();
   int getNumOpenConnections();
   int getActiveRpcHandlerCount();
+  long getNumGeneralCallsDropped();
+  long getNumLifoModeSwitches();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 48f57e9..c466564 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -219,7 +219,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
           .addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
               NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections())
           .addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME,
-              NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount());
+              NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount())
+          .addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
+              NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
+          .addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
+              NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
     }
 
     metricsRegistry.snapshot(mrb, all);

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
new file mode 100644
index 0000000..37e86be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
+ *
+ * Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
+ *
+ * Currently uses milliseconds internally, need to look into whether we should use
+ * nanoseconds for timeInterval and minDelay.
+ *
+ * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
+ *
+ * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp">
+ *   CoDel version for generic job queues in Wangle library</a>
+ */
+@InterfaceAudience.Private
+public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
+
+  // backing queue
+  private LinkedBlockingDeque<CallRunner> queue;
+
+  // so we can calculate actual threshold to switch to LIFO under load
+  private int maxCapacity;
+
+  // metrics (shared across all queues)
+  private AtomicLong numGeneralCallsDropped;
+  private AtomicLong numLifoModeSwitches;
+
+  /**
+   * Lock held by take ops, all other locks are inside queue impl.
+   *
+   * NOTE: We want to have this lock so that in case when there're lot of already expired
+   * calls in the call queue a handler thread calling take() can just grab lock once and
+   * then fast-forward through the expired calls to the first non-expired without having
+   * to contend for locks on every element in underlying queue.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  // Both are in milliseconds
+  private volatile int codelTargetDelay;
+  private volatile int codelInterval;
+
+  // if queue if full more than that percent, we switch to LIFO mode.
+  // Values are in the range of 0.7, 0.8 etc (0-1.0).
+  private volatile double lifoThreshold;
+
+  // minimal delay observed during the interval
+  private volatile long minDelay;
+
+  // the moment when current interval ends
+  private volatile long intervalTime = System.currentTimeMillis();
+
+  // switch to ensure only one threads does interval cutoffs
+  private AtomicBoolean resetDelay = new AtomicBoolean(true);
+
+  // if we're in this mode, "long" calls are getting dropped
+  private volatile boolean isOverloaded;
+
+  public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
+      double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
+    this.maxCapacity = capacity;
+    this.queue = new LinkedBlockingDeque<>(capacity);
+    this.codelTargetDelay = targetDelay;
+    this.codelInterval = interval;
+    this.lifoThreshold = lifoThreshold;
+    this.numGeneralCallsDropped = numGeneralCallsDropped;
+    this.numLifoModeSwitches = numLifoModeSwitches;
+  }
+
+  /**
+   * Update tunables.
+   *
+   * @param newCodelTargetDelay new CoDel target delay
+   * @param newCodelInterval new CoDel interval
+   * @param newLifoThreshold new Adaptive Lifo threshold
+   */
+  public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
+                             double newLifoThreshold) {
+    this.codelTargetDelay = newCodelTargetDelay;
+    this.codelInterval = newCodelInterval;
+    this.lifoThreshold = newLifoThreshold;
+  }
+
+  /**
+   * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
+   * skip all calls which it thinks should be dropped.
+   *
+   * @return the head of this queue
+   * @throws InterruptedException if interrupted while waiting
+   */
+  @Override
+  public CallRunner take() throws InterruptedException {
+    final ReentrantLock lock = this.lock;
+    lock.lock();
+    try {
+      CallRunner cr;
+      while(true) {
+        if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
+          numLifoModeSwitches.incrementAndGet();
+          cr = queue.takeLast();
+        } else {
+          cr = queue.takeFirst();
+        }
+        if (needToDrop(cr)) {
+          numGeneralCallsDropped.incrementAndGet();
+        } else {
+          return cr;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @param callRunner to validate
+   * @return true if this call needs to be skipped based on call timestamp
+   *   and internal queue state (deemed overloaded).
+   */
+  private boolean needToDrop(CallRunner callRunner) {
+    long now = System.currentTimeMillis();
+    long callDelay = now - callRunner.getCall().timestamp;
+
+    long localMinDelay = this.minDelay;
+    if (now > intervalTime && !resetDelay.getAndSet(true)) {
+      intervalTime = now + codelInterval;
+
+      if (localMinDelay > codelTargetDelay) {
+        isOverloaded = true;
+      } else {
+        isOverloaded = false;
+      }
+    }
+
+    if (resetDelay.getAndSet(false)) {
+      minDelay = callDelay;
+      return false;
+    } else if (callDelay < localMinDelay) {
+      minDelay = callDelay;
+    }
+
+    if (isOverloaded && callDelay > 2 * codelTargetDelay) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // Generic BlockingQueue methods we support
+  @Override
+  public boolean offer(CallRunner callRunner) {
+    return queue.offer(callRunner);
+  }
+
+  @Override
+  public int size() {
+    return queue.size();
+  }
+
+  @Override
+  public String toString() {
+    return queue.toString();
+  }
+
+  // This class does NOT provide generic purpose BlockingQueue implementation,
+  // so to prevent misuse all other methods throw UnsupportedOperationException.
+
+  @Override
+  public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public CallRunner poll() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public CallRunner peek() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public Object[] toArray() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public <T> T[] toArray(T[] a) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public void clear() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public int drainTo(Collection<? super CallRunner> c) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public int drainTo(Collection<? super CallRunner> c, int maxElements) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public Iterator<CallRunner> iterator() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean add(CallRunner callRunner) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public CallRunner remove() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public CallRunner element() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends CallRunner> c) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean isEmpty() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public int remainingCapacity() {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public void put(CallRunner callRunner) throws InterruptedException {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+
+  @Override
+  public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    throw new UnsupportedOperationException("This class doesn't support anything,"
+      + " but take() and offer() methods");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 5e104eb..b069a5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -104,4 +104,14 @@ public class FifoRpcScheduler extends RpcScheduler {
   public int getActiveRpcHandlerCount() {
     return executor.getActiveCount();
   }
+
+  @Override
+  public long getNumGeneralCallsDropped() {
+    return 0;
+  }
+
+  @Override
+  public long getNumLifoModeSwitches() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 63c4b32..9979c75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -78,4 +78,20 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
     }
     return server.getScheduler().getActiveRpcHandlerCount();
   }
+
+  @Override
+  public long getNumGeneralCallsDropped() {
+    if (!isServerStarted() || this.server.getScheduler() == null) {
+      return 0;
+    }
+    return server.getScheduler().getNumGeneralCallsDropped();
+  }
+
+  @Override
+  public long getNumLifoModeSwitches() {
+    if (!isServerStarted() || this.server.getScheduler() == null) {
+      return 0;
+    }
+    return server.getScheduler().getNumLifoModeSwitches();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index a9648b0..e0203ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -100,6 +100,16 @@ public class RWQueueRpcExecutor extends RpcExecutor {
       readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
   }
 
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final float readShare, final float scanShare,
+      final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+      final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+    this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+      calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
+      writeQueueClass, writeQueueInitArgs,
+      readQueueClass, readQueueInitArgs);
+  }
+
   public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
       final int numWriteQueues, final int numReadQueues,
       final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index fffe7f3..50886cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -71,4 +71,17 @@ public abstract class RpcScheduler {
 
   /** Retrieves the number of active handler. */
   public abstract int getActiveRpcHandlerCount();
+
+  /**
+   * If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped
+   * from general queue because RPC executor is under high load; returns 0 otherwise.
+   */
+  public abstract long getNumGeneralCallsDropped();
+
+  /**
+   * If CoDel-based RPC executors are used, retrieves the number of Calls that were
+   * picked from the tail of the queue (indicating adaptive LIFO mode, when
+   * in the period of overloade we serve last requests first); returns 0 otherwise.
+   */
+  public abstract long getNumLifoModeSwitches();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0003254..12ee540 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
 
 
 import java.util.Comparator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +51,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
 
   /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
   public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
+  public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
   public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
   public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
 
@@ -56,6 +59,21 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
   public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
       = "hbase.ipc.server.queue.max.call.delay";
 
+  // These 3 are only used by Codel executor
+  public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
+    "hbase.ipc.server.callqueue.codel.target.delay";
+  public static final String CALL_QUEUE_CODEL_INTERVAL =
+    "hbase.ipc.server.callqueue.codel.interval";
+  public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
+    "hbase.ipc.server.callqueue.codel.lifo.threshold";
+
+  public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
+  public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
+  public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
+
+  private AtomicLong numGeneralCallsDropped = new AtomicLong();
+  private AtomicLong numLifoModeSwitches = new AtomicLong();
+
   /**
    * Resize call queues;
    * @param conf new configuration
@@ -69,6 +87,26 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
     if (replicationExecutor != null) {
       replicationExecutor.resizeQueues(conf);
     }
+
+    String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
+      CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+
+    if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+      // update CoDel Scheduler tunables
+      int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
+        CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
+      int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
+        CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
+      double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
+        CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
+
+      for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
+        if (queue instanceof AdaptiveLifoCoDelCallQueue) {
+          ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
+            codelInterval, codelLifoThreshold);
+        }
+      }
+    }
   }
 
   /**
@@ -134,10 +172,18 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
     this.highPriorityLevel = highPriorityLevel;
     this.abortable = server;
 
-    String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+    String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
+      CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
     float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
 
+    int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
+      CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
+    int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
+      CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
+    double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
+      CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
+
     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
 
@@ -150,6 +196,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
             BoundedPriorityBlockingQueue.class, callPriority);
+      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+        Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
+          codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
+        callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
+          numCallQueues, callqReadShare, callqScanShare,
+          AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
+          AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
       } else {
         callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
@@ -160,6 +213,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
           conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+          conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
+          codelTargetDelay, codelInterval, codelLifoThreshold,
+          numGeneralCallsDropped, numLifoModeSwitches);
       } else {
         callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
             numCallQueues, maxQueueLength, conf, abortable);
@@ -239,5 +297,15 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
            (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
            (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
   }
+
+  @Override
+  public long getNumGeneralCallsDropped() {
+    return numGeneralCallsDropped.get();
+  }
+
+  @Override
+  public long getNumLifoModeSwitches() {
+    return numLifoModeSwitches.get();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index 6241f8e..b001d74 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -48,4 +48,14 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
   public int getActiveRpcHandlerCount() {
     return 106;
   }
+
+  @Override
+  public long getNumGeneralCallsDropped() {
+    return 3;
+  }
+
+  @Override
+  public long getNumLifoModeSwitches() {
+    return 5;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43f99def/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 66032e9..916037b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -362,4 +362,67 @@ public class TestSimpleRpcScheduler {
       scheduler.stop();
     }
   }
+
+  @Test
+  public void testCoDelScheduling() throws Exception {
+    Configuration schedConf = HBaseConfiguration.create();
+
+    schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
+      SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
+      any(User.class))).thenReturn(HConstants.NORMAL_QOS);
+    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
+      HConstants.QOS_THRESHOLD);
+    try {
+      scheduler.start();
+
+      // calls faster than min delay
+      for (int i = 0; i < 100; i++) {
+        CallRunner cr = getMockedCallRunner();
+        Thread.sleep(5);
+        scheduler.dispatch(cr);
+      }
+      Thread.sleep(100); // make sure fast calls are handled
+      assertEquals("None of these calls should have been discarded", 0,
+        scheduler.getNumGeneralCallsDropped());
+
+      // calls slower than min delay, but not individually slow enough to be dropped
+      for (int i = 0; i < 20; i++) {
+        CallRunner cr = getMockedCallRunner();
+        Thread.sleep(6);
+        scheduler.dispatch(cr);
+      }
+
+      Thread.sleep(100); // make sure somewhat slow calls are handled
+      assertEquals("None of these calls should have been discarded", 0,
+        scheduler.getNumGeneralCallsDropped());
+
+      // now slow calls and the ones to be dropped
+      for (int i = 0; i < 20; i++) {
+        CallRunner cr = getMockedCallRunner();
+        Thread.sleep(12);
+        scheduler.dispatch(cr);
+      }
+
+      Thread.sleep(100); // make sure somewhat slow calls are handled
+      assertTrue("There should have been at least 12 calls dropped",
+        scheduler.getNumGeneralCallsDropped() > 12);
+    } finally {
+      scheduler.stop();
+    }
+  }
+
+  private CallRunner getMockedCallRunner() throws IOException {
+    CallRunner putCallTask = mock(CallRunner.class);
+    RpcServer.Call putCall = mock(RpcServer.Call.class);
+    putCall.param = RequestConverter.buildMutateRequest(
+      Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+    RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
+    when(putCallTask.getCall()).thenReturn(putCall);
+    when(putCall.getHeader()).thenReturn(putHead);
+    putCall.timestamp = System.currentTimeMillis();
+    return putCallTask;
+  }
 }