You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/02/09 22:18:10 UTC
hadoop git commit: HADOOP-14033. Reduce fair call queue lock
contention. Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/trunk 9b8505358 -> 0c01cf579
HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c01cf57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c01cf57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c01cf57
Branch: refs/heads/trunk
Commit: 0c01cf57987bcc7a17154a3538960b67f625a9e5
Parents: 9b85053
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Feb 9 16:17:24 2017 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Feb 9 16:17:24 2017 -0600
----------------------------------------------------------------------
.../org/apache/hadoop/ipc/FairCallQueue.java | 167 ++++++-------------
1 file changed, 51 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c01cf57/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 77a9d65..820f24c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -27,8 +27,7 @@ import java.util.AbstractQueue;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -55,16 +54,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* The queues */
private final ArrayList<BlockingQueue<E>> queues;
- /* Read locks */
- private final ReentrantLock takeLock = new ReentrantLock();
- private final Condition notEmpty = takeLock.newCondition();
+ /* Track available permits for scheduled objects. All methods that will
+ * mutate a subqueue must acquire or release a permit on the semaphore.
+ * A semaphore is much faster than an exclusive lock because producers do
+ * not contend with consumers and consumers do not block other consumers
+ * while polling.
+ */
+ private final Semaphore semaphore = new Semaphore(0);
private void signalNotEmpty() {
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
+ semaphore.release();
}
/* Multiplexer picks which queue to draw from */
@@ -112,28 +110,25 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
}
/**
- * Returns the first non-empty queue with equal to <i>startIdx</i>, or
- * or scans from highest to lowest priority queue.
+ * Returns an element first non-empty queue equal to the priority returned
+ * by the multiplexer or scans from highest to lowest priority queue.
+ *
+ * Caller must always acquire a semaphore permit before invoking.
*
- * @param startIdx the queue number to start searching at
* @return the first non-empty queue with less priority, or null if
* everything was empty
*/
- private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
- BlockingQueue<E> queue = this.queues.get(startIdx);
- if (queue.size() != 0) {
- return queue;
- }
- final int numQueues = this.queues.size();
- for(int i=0; i < numQueues; i++) {
- queue = this.queues.get(i);
- if (queue.size() != 0) {
- return queue;
+ private E removeNextElement() {
+ int priority = multiplexer.getAndAdvanceCurrentIndex();
+ E e = queues.get(priority).poll();
+ if (e == null) {
+ for (int idx = 0; e == null && idx < queues.size(); idx++) {
+ e = queues.get(idx).poll();
}
}
-
- // All queues were empty
- return null;
+ // guaranteed to find an element if caller acquired permit.
+ assert e != null : "consumer didn't acquire semaphore!";
+ return e;
}
/* AbstractQueue and BlockingQueue methods */
@@ -184,9 +179,9 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e, timeout, unit);
-
- signalNotEmpty();
-
+ if (ret) {
+ signalNotEmpty();
+ }
return ret;
}
@@ -195,72 +190,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e);
-
- signalNotEmpty();
-
+ if (ret) {
+ signalNotEmpty();
+ }
return ret;
}
@Override
public E take() throws InterruptedException {
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
- takeLock.lockInterruptibly();
- try {
- // Wait while queue is empty
- for (;;) {
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
- if (q != null) {
- // Got queue, so return if we can poll out an object
- E e = q.poll();
- if (e != null) {
- return e;
- }
- }
-
- notEmpty.await();
- }
- } finally {
- takeLock.unlock();
- }
+ semaphore.acquire();
+ return removeNextElement();
}
@Override
- public E poll(long timeout, TimeUnit unit)
- throws InterruptedException {
-
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
- long nanos = unit.toNanos(timeout);
- takeLock.lockInterruptibly();
- try {
- for (;;) {
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
- if (q != null) {
- E e = q.poll();
- if (e != null) {
- // Escape condition: there might be something available
- return e;
- }
- }
-
- if (nanos <= 0) {
- // Wait has elapsed
- return null;
- }
-
- try {
- // Now wait on the condition for a bit. If we get
- // spuriously awoken we'll re-loop
- nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- }
- } finally {
- takeLock.unlock();
- }
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null;
}
/**
@@ -269,15 +213,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public E poll() {
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
-
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
- if (q == null) {
- return null; // everything is empty
- }
-
- // Delegate to the sub-queue's poll, which could still return null
- return q.poll();
+ return semaphore.tryAcquire() ? removeNextElement() : null;
}
/**
@@ -285,12 +221,11 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public E peek() {
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
- if (q == null) {
- return null;
- } else {
- return q.peek();
+ E e = null;
+ for (int i=0; e == null && i < queues.size(); i++) {
+ e = queues.get(i).peek();
}
+ return e;
}
/**
@@ -301,11 +236,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public int size() {
- int size = 0;
- for (BlockingQueue<E> q : this.queues) {
- size += q.size();
- }
- return size;
+ return semaphore.availablePermits();
}
/**
@@ -324,20 +255,24 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
- int sum = 0;
- for (BlockingQueue<E> q : this.queues) {
- sum += q.drainTo(c, maxElements);
+ // initially take all permits to stop consumers from modifying queues
+ // while draining. will restore any excess when done draining.
+ final int permits = semaphore.drainPermits();
+ final int numElements = Math.min(maxElements, permits);
+ int numRemaining = numElements;
+ for (int i=0; numRemaining > 0 && i < queues.size(); i++) {
+ numRemaining -= queues.get(i).drainTo(c, numRemaining);
}
- return sum;
+ int drained = numElements - numRemaining;
+ if (permits > drained) { // restore unused permits.
+ semaphore.release(permits - drained);
+ }
+ return drained;
}
@Override
public int drainTo(Collection<? super E> c) {
- int sum = 0;
- for (BlockingQueue<E> q : this.queues) {
- sum += q.drainTo(c);
- }
- return sum;
+ return drainTo(c, Integer.MAX_VALUE);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org