You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/02/09 22:18:10 UTC

hadoop git commit: HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9b8505358 -> 0c01cf579


HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c01cf57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c01cf57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c01cf57

Branch: refs/heads/trunk
Commit: 0c01cf57987bcc7a17154a3538960b67f625a9e5
Parents: 9b85053
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Feb 9 16:17:24 2017 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Feb 9 16:17:24 2017 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/FairCallQueue.java    | 167 ++++++-------------
 1 file changed, 51 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c01cf57/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 77a9d65..820f24c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -27,8 +27,7 @@ import java.util.AbstractQueue;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -55,16 +54,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   /* The queues */
   private final ArrayList<BlockingQueue<E>> queues;
 
-  /* Read locks */
-  private final ReentrantLock takeLock = new ReentrantLock();
-  private final Condition notEmpty = takeLock.newCondition();
+  /* Track available permits for scheduled objects.  All methods that will
+   * mutate a subqueue must acquire or release a permit on the semaphore.
+   * A semaphore is much faster than an exclusive lock because producers do
+   * not contend with consumers and consumers do not block other consumers
+   * while polling.
+   */
+  private final Semaphore semaphore = new Semaphore(0);
   private void signalNotEmpty() {
-    takeLock.lock();
-    try {
-      notEmpty.signal();
-    } finally {
-      takeLock.unlock();
-    }
+    semaphore.release();
   }
 
   /* Multiplexer picks which queue to draw from */
@@ -112,28 +110,25 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   }
 
   /**
-   * Returns the first non-empty queue with equal to <i>startIdx</i>, or
-   * or scans from highest to lowest priority queue.
+   * Returns an element first non-empty queue equal to the priority returned
+   * by the multiplexer or scans from highest to lowest priority queue.
+   *
+   * Caller must always acquire a semaphore permit before invoking.
    *
-   * @param startIdx the queue number to start searching at
    * @return the first non-empty queue with less priority, or null if
    * everything was empty
    */
-  private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
-    BlockingQueue<E> queue = this.queues.get(startIdx);
-    if (queue.size() != 0) {
-      return queue;
-    }
-    final int numQueues = this.queues.size();
-    for(int i=0; i < numQueues; i++) {
-      queue = this.queues.get(i);
-      if (queue.size() != 0) {
-        return queue;
+  private E removeNextElement() {
+    int priority = multiplexer.getAndAdvanceCurrentIndex();
+    E e = queues.get(priority).poll();
+    if (e == null) {
+      for (int idx = 0; e == null && idx < queues.size(); idx++) {
+        e = queues.get(idx).poll();
       }
     }
-
-    // All queues were empty
-    return null;
+    // guaranteed to find an element if caller acquired permit.
+    assert e != null : "consumer didn't acquire semaphore!";
+    return e;
   }
 
   /* AbstractQueue and BlockingQueue methods */
@@ -184,9 +179,9 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     int priorityLevel = e.getPriorityLevel();
     BlockingQueue<E> q = this.queues.get(priorityLevel);
     boolean ret = q.offer(e, timeout, unit);
-
-    signalNotEmpty();
-
+    if (ret) {
+      signalNotEmpty();
+    }
     return ret;
   }
 
@@ -195,72 +190,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     int priorityLevel = e.getPriorityLevel();
     BlockingQueue<E> q = this.queues.get(priorityLevel);
     boolean ret = q.offer(e);
-
-    signalNotEmpty();
-
+    if (ret) {
+      signalNotEmpty();
+    }
     return ret;
   }
 
   @Override
   public E take() throws InterruptedException {
-    int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
-    takeLock.lockInterruptibly();
-    try {
-      // Wait while queue is empty
-      for (;;) {
-        BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
-        if (q != null) {
-          // Got queue, so return if we can poll out an object
-          E e = q.poll();
-          if (e != null) {
-            return e;
-          }
-        }
-
-        notEmpty.await();
-      }
-    } finally {
-      takeLock.unlock();
-    }
+    semaphore.acquire();
+    return removeNextElement();
   }
 
   @Override
-  public E poll(long timeout, TimeUnit unit)
-      throws InterruptedException {
-
-    int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
-    long nanos = unit.toNanos(timeout);
-    takeLock.lockInterruptibly();
-    try {
-      for (;;) {
-        BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
-        if (q != null) {
-          E e = q.poll();
-          if (e != null) {
-            // Escape condition: there might be something available
-            return e;
-          }
-        }
-
-        if (nanos <= 0) {
-          // Wait has elapsed
-          return null;
-        }
-
-        try {
-          // Now wait on the condition for a bit. If we get
-          // spuriously awoken we'll re-loop
-          nanos = notEmpty.awaitNanos(nanos);
-        } catch (InterruptedException ie) {
-          notEmpty.signal(); // propagate to a non-interrupted thread
-          throw ie;
-        }
-      }
-    } finally {
-      takeLock.unlock();
-    }
+  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+    return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null;
   }
 
   /**
@@ -269,15 +213,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    */
   @Override
   public E poll() {
-    int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
-    BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
-    if (q == null) {
-      return null; // everything is empty
-    }
-
-    // Delegate to the sub-queue's poll, which could still return null
-    return q.poll();
+    return semaphore.tryAcquire() ? removeNextElement() : null;
   }
 
   /**
@@ -285,12 +221,11 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    */
   @Override
   public E peek() {
-    BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
-    if (q == null) {
-      return null;
-    } else {
-      return q.peek();
+    E e = null;
+    for (int i=0; e == null && i < queues.size(); i++) {
+      e = queues.get(i).peek();
     }
+    return e;
   }
 
   /**
@@ -301,11 +236,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    */
   @Override
   public int size() {
-    int size = 0;
-    for (BlockingQueue<E> q : this.queues) {
-      size += q.size();
-    }
-    return size;
+    return semaphore.availablePermits();
   }
 
   /**
@@ -324,20 +255,24 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    */
   @Override
   public int drainTo(Collection<? super E> c, int maxElements) {
-    int sum = 0;
-    for (BlockingQueue<E> q : this.queues) {
-      sum += q.drainTo(c, maxElements);
+    // initially take all permits to stop consumers from modifying queues
+    // while draining.  will restore any excess when done draining.
+    final int permits = semaphore.drainPermits();
+    final int numElements = Math.min(maxElements, permits);
+    int numRemaining = numElements;
+    for (int i=0; numRemaining > 0 && i < queues.size(); i++) {
+      numRemaining -= queues.get(i).drainTo(c, numRemaining);
     }
-    return sum;
+    int drained = numElements - numRemaining;
+    if (permits > drained) { // restore unused permits.
+      semaphore.release(permits - drained);
+    }
+    return drained;
   }
 
   @Override
   public int drainTo(Collection<? super E> c) {
-    int sum = 0;
-    for (BlockingQueue<E> q : this.queues) {
-      sum += q.drainTo(c);
-    }
-    return sum;
+    return drainTo(c, Integer.MAX_VALUE);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org