You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/08 00:46:05 UTC
svn commit: r1395411 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Author: tabish
Date: Sun Oct 7 22:46:05 2012
New Revision: 1395411
URL: http://svn.apache.org/viewvc?rev=1395411&view=rev
Log:
refactoring to attempt a fix for: https://issues.apache.org/jira/browse/AMQCPP-405
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1395411&r1=1395410&r2=1395411&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Sun Oct 7 22:46:05 2012
@@ -29,6 +29,8 @@
#include <decaf/internal/util/concurrent/Atomics.h>
#include <decaf/internal/util/concurrent/PlatformThread.h>
+#include <deque>
+
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
@@ -187,6 +189,12 @@ namespace {
*/
Node* nextWaiter;
+ /**
+ * Linked nodes in the free pool using this special next trail to avoid
+ * live nodes stepping into the pool and getting stuck.
+ */
+ Node* nextFree;
+
private:
Node(const Node&);
@@ -194,11 +202,13 @@ namespace {
public:
- Node() : waitStatus(0), prev(NULL), next(NULL), thread(NULL), nextWaiter(NULL) {
+ Node() : waitStatus(0), prev(NULL), next(NULL), thread(NULL), nextWaiter(NULL), nextFree(NULL) {
+ }
+ Node(Thread* thread, Node* node) : waitStatus(0), prev(NULL), next(NULL), thread(thread), nextWaiter(node), nextFree(NULL) {
}
- Node(Thread* thread, Node* node) : waitStatus(0), prev(NULL), next(NULL), thread(thread), nextWaiter(node) {
+ Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(NULL), nextFree(NULL) {
}
- Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(NULL) {
+ Node(Thread* thread, int waitStatus, Node* node) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(node), nextFree(NULL) {
}
~Node() {}
@@ -223,6 +233,122 @@ namespace {
// For very short timeouts its usually more efficient to just spin instead of
// parking a thread.
const long long spinForTimeoutLimit = 1000LL;
+
+ /**
+ * When a thread no longer needs its Node in the AQS it is moved to the NodePool.
+ *
+ * The Pool will create Node's on demand or access them from its internal pool of
+ * old Nodes. The Pool can at times shrink the list of old nodes if it needs to.
+ */
+ class NodePool {
+ private:
+
+ Node head;
+ Node* tail;
+ decaf_mutex_t lock;
+
+ public:
+
+ NodePool() : head(), tail(NULL), lock() {
+ PlatformThread::createMutex(&lock);
+
+ for (int i = 0; i < 100; ++i) {
+ Node* node = new Node();
+ Node* t = tail;
+
+ if (t != NULL) {
+ t->nextFree = node;
+ tail = node;
+ } else {
+ tail = node;
+ head.nextFree = tail;
+ }
+ }
+ }
+
+ ~NodePool() {
+
+ PlatformThread::lockMutex(lock);
+
+ while (head.nextFree != NULL) {
+ Node* node = head.nextFree;
+ head.nextFree = node->nextFree;
+ delete node;
+ }
+
+ PlatformThread::unlockMutex(lock);
+
+ PlatformThread::destroyMutex(lock);
+ }
+
+ Node* takeNode() {
+ return takeNode(NULL, 0, NULL);
+ }
+
+ Node* takeNode(Thread* thread, int waitStatus) {
+ return takeNode(thread, waitStatus, NULL);
+ }
+
+ Node* takeNode(Thread* thread, Node* nextWaiter) {
+ return takeNode(thread, 0, nextWaiter);
+ }
+
+ Node* takeNode(Thread* thread, int waitStatus, Node* nextWaiter) {
+
+ Node* result = NULL;
+
+ if (head.nextFree != NULL) {
+ PlatformThread::lockMutex(lock);
+
+ if (head.nextFree != NULL) {
+ result = head.nextFree;
+ head.nextFree = result->nextFree;
+
+ if (result == tail) {
+ tail = NULL;
+ }
+ }
+
+ PlatformThread::unlockMutex(lock);
+ }
+
+ if (result == NULL) {
+ result = new Node(thread, waitStatus, nextWaiter);
+ } else {
+ // Reset to the new state.
+ result->thread = thread;
+ result->waitStatus = waitStatus;
+ result->nextWaiter = nextWaiter;
+ result->prev = NULL;
+ result->next = NULL;
+ result->nextFree = NULL;
+ }
+
+ return result;
+ }
+
+ void returnNode(Node* node) {
+
+ if (node == NULL) {
+ return;
+ }
+
+ PlatformThread::lockMutex(lock);
+
+ Node* t = tail;
+
+ if (t != NULL) {
+ t->nextFree = node;
+ tail = node;
+ tail->nextFree = NULL;
+ } else {
+ tail = node;
+ head.nextFree = tail;
+ }
+
+ PlatformThread::unlockMutex(lock);
+ }
+ };
}
////////////////////////////////////////////////////////////////////////////////
@@ -251,16 +377,6 @@ namespace locks {
volatile int state;
/**
- * Platform level R/W lock. Because we don't implement a garbage collected Node
- * scheme we can't just use atomic operations on the Node pointers so in cases where
- * we operate on the list of Nodes to remove canceled items we must write lock the
- * list. Likewise in cases where we are iterating through the list to collect
- * statistics we must ensure that a Node won't suddenly become invalid so we must
- * hold a read lock.
- */
- decaf_rwmutex_t rwLock;
-
- /**
* Head of the wait queue, lazily initialized. Except for initialization, it is
* modified only via method setHead. Note: If head exists, its waitStatus is
* guaranteed not to be CANCELLED.
@@ -273,25 +389,22 @@ namespace locks {
*/
AtomicReference<Node> tail;
+ /**
+ * Pool used to store discarded Nodes for reuse by another thread.
+ */
+ NodePool nodePool;
+
public:
SynchronizerState(AbstractQueuedSynchronizer* parent) :
- parent(parent), state(0), rwLock(), head(), tail() {
- PlatformThread::createRWMutex(&rwLock);
+ parent(parent), state(0), head(), tail() {
}
virtual ~SynchronizerState() {
- // Ensure that the destructor waits for other operations to complete.
- PlatformThread::writerLockMutex(rwLock);
-
while (tail.get() != NULL) {
- delete tail.getAndSet(tail.get()->prev);
+ nodePool.returnNode(tail.getAndSet(tail.get()->prev));
}
-
- PlatformThread::unlockRWMutex(rwLock);
-
- PlatformThread::destroyRWMutex(rwLock);
}
bool isHeldExclusively() const {
@@ -310,24 +423,22 @@ namespace locks {
*/
Node* enqueue(Node* node) {
- Node* pred = NULL;
-
- PlatformThread::writerLockMutex(rwLock);
-
- pred = tail.get();
- if (pred == NULL) { // Must initialize
- pred = new Node();
- head.set(pred);
- tail.set(pred);
+ for (;;) {
+ Node* t = tail.get();
+ if (t == NULL) { // Must initialize
+ if (compareAndSetHead(nodePool.takeNode())) {
+ tail.set(head.get());
+ }
+ } else {
+ node->prev = t;
+ if (compareAndSetTail(t, node)) {
+ t->next = node;
+ return t;
+ }
+ }
}
- node->prev = pred;
- pred->next = node;
- tail.set(node);
-
- PlatformThread::unlockRWMutex(rwLock);
-
- return pred;
+ return NULL;
}
/**
@@ -341,27 +452,39 @@ namespace locks {
* @return the newly added Node
*/
Node* addWaiter(Node* mode) {
- Node* node = new Node(Thread::currentThread(), mode);
+ Node* node = nodePool.takeNode(Thread::currentThread(), mode);
+
+ // Try the fast add method first, then fall-back to the slower one.
+ Node* pred = tail.get();
+ if (pred != NULL) {
+ node->prev = pred;
+ if (compareAndSetTail(pred, node)) {
+ pred->next = node;
+ return node;
+ }
+ }
+
enqueue(node);
+
return node;
}
/**
* The only place head is altered, we NULL out prev since that Node will be
* Destroyed or pooled after this, but leave next alone since it should still
- * be valid.
+ * be valid. The method calling this needs to be the lock holder which ensures
+ * that no other thread can alter head.
*
* @param node
* The Node that is to become the new Head of the queue.
*/
Node* setHead(Node* node) {
- Node* oldHead = NULL;
- PlatformThread::writerLockMutex(rwLock);
- oldHead = head.get();
+ Node* oldHead = head.get();
+
head.set(node);
node->thread = NULL;
node->prev = NULL;
- PlatformThread::unlockRWMutex(this->rwLock);
+
return oldHead;
}
@@ -374,34 +497,29 @@ namespace locks {
void unparkSuccessor(Node* node) {
// If status is negative (i.e., possibly needing signal) try to clear
- // in anticipation of signalling. It is OK if this fails or if status
+ // in anticipation of signaling. It is OK if this fails or if status
// is changed by waiting thread.
int ws = node->waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
- // We need to lock to prevent cancellation of a Node from altering
- // the list as we iterate and check Node status fields.
- PlatformThread::readerLockMutex(this->rwLock);
-
// Thread to un-park is held in successor, which is normally just the
// next node. But if canceled or apparently NULL, traverse backwards
// from tail to find the actual non-canceled successor.
Node* successor = node->next;
if (successor == NULL || successor->waitStatus > 0) {
successor = NULL;
- for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
+ for (Node* t = tail.get(); t != NULL && t != node; t = t->prev) {
if (t->waitStatus <= 0) {
successor = t;
}
+ }
}
if (successor != NULL) {
LockSupport::unpark((Thread*)successor->thread);
}
-
- PlatformThread::unlockRWMutex(this->rwLock);
}
/**
@@ -411,10 +529,6 @@ namespace locks {
*/
void doReleaseShared() {
- // Here we have to read lock because head could change when a
- // different thread does its release shared.
- PlatformThread::readerLockMutex(this->rwLock);
-
// Ensure that a release propagates, even if there are other in-progress
// acquires/releases. This proceeds in the usual way of trying to
// unparkSuccessor of head if it needs signal. But if it does not,
@@ -431,20 +545,15 @@ namespace locks {
continue; // loop to recheck cases
}
- // Platform level lock may not be reentrant.
- PlatformThread::unlockRWMutex(this->rwLock);
unparkSuccessor(h);
- PlatformThread::readerLockMutex(this->rwLock);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node::PROPAGATE)) {
continue; // loop on failed CAS
}
}
- if (h == head.get()) { // loop if head changed
+ if (h == head.get()) { // loop if head changed
break;
}
}
-
- PlatformThread::unlockRWMutex(this->rwLock);
}
/**
@@ -462,10 +571,6 @@ namespace locks {
Node* head = setHead(node); // Record old head for check below
- // Here we have to read lock because head could change when a
- // different thread does its release shared.
- PlatformThread::readerLockMutex(this->rwLock);
-
// Try to signal next queued node if:
// Propagation was indicated by caller, or was recorded (as head->waitStatus)
// by a previous operation (note: this uses sign-check of waitStatus because
@@ -478,65 +583,67 @@ namespace locks {
if (propagate > 0 || head == NULL || head->waitStatus < 0) {
Node* successor = node->next;
if (successor == NULL || successor->isShared()) {
- PlatformThread::unlockRWMutex(this->rwLock);
doReleaseShared();
- } else {
- PlatformThread::unlockRWMutex(this->rwLock);
}
- } else {
- PlatformThread::unlockRWMutex(this->rwLock);
}
return head;
}
/**
- * Cancels an ongoing attempt to acquire. The cancelled Node needs to be
- * deleted but we need to unlink it from the list before we can do so.
+ * Cancels an ongoing attempt to acquire. A canceled node can be returned to
+ * the pool once its status has been updated and its links are updated.
*
* @param node
- * The node that was attempting to acquire, will be delted here.
+ * The node that was attempting to acquire, will be returned to the pool.
*/
void cancelAcquire(Node* node) {
- // Ignore if node doesn't exist
if (node == NULL) {
return;
}
node->thread = NULL;
- // Can use unconditional write instead of CAS here. After this atomic
- // step, other Nodes can skip past us. Before, we are free of interference
- // from other threads.
- node->waitStatus = Node::CANCELLED;
-
- // If successor needs signal, try to set pred's next-link so it will
- // get one. Otherwise wake it up to propagate.
- int ws = 0;
-
- PlatformThread::writerLockMutex(this->rwLock);
-
- if (node == tail.get()) {
- tail.set(node->prev);
- node->prev->next = NULL;
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
+ // Skip canceled predecessors
+ Node* pred = node->prev;
+ while (pred->waitStatus > 0) {
+ node->prev = pred = pred->prev;
}
- if (node->prev != head.get() &&
- ((ws = node->prev->waitStatus) == Node::SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
- node->prev->thread != NULL) {
+ // predNext is the apparent node to unsplice. CASes below will
+ // fail if not, in which case, we lost race vs another cancel
+ // or signal, so no further action is necessary.
+ Node* predNext = pred->next;
+
+ // Can use unconditional write instead of CAS here.
+ // After this atomic step, other Nodes can skip past us.
+ // Before, we are free of interference from other threads.
+ node->waitStatus = Node::CANCELLED;
- PlatformThread::unlockRWMutex(this->rwLock);
+ // If we are the tail, remove ourselves.
+ if (node == tail.get() && compareAndSetTail(node, pred)) {
+ compareAndSetNext(pred, predNext, NULL);
} else {
- PlatformThread::unlockRWMutex(this->rwLock);
- unparkSuccessor(node);
+ // If successor needs signal, try to set pred's next-link
+ // so it will get one. Otherwise wake it up to propagate.
+ int ws;
+ if (pred != head.get() &&
+ ((ws = pred->waitStatus) == Node::SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node::SIGNAL))) &&
+ pred->thread != NULL) {
+
+ Node* next = node->next;
+ if (next != NULL && next->waitStatus <= 0) {
+ compareAndSetNext(pred, predNext, next);
+ }
+ } else {
+ unparkSuccessor(node);
+ }
}
- delete node;
+ node->next = NULL; // Help any lingering threads that land on this node
+ nodePool.returnNode(node);
}
/**
@@ -551,21 +658,22 @@ namespace locks {
*
* @return true if thread should block.
*/
- bool shouldParkAfterFailedAcquire(Node* node) {
-
- bool result = false;
-
- PlatformThread::readerLockMutex(this->rwLock);
+ bool shouldParkAfterFailedAcquire(Node* pred, Node* node) {
- int ws = node->prev->waitStatus;
+ int ws = pred->waitStatus;
- if (ws == Node::SIGNAL)
+ if (ws == Node::SIGNAL) {
// This node has already set status asking a release to signal
// it, so it can safely park.
- result = true;
+ return true;
+ }
+
if (ws > 0) {
// Predecessor was canceled. Skip over predecessors and indicate retry.
- result = false;
+ do {
+ node->prev = pred = pred->prev;
+ } while (pred->waitStatus > 0);
+ pred->next = node;
} else {
// waitStatus must be 0 or PROPAGATE. Indicate that we need a
// signal, but don't park yet. Caller will need to retry to
@@ -573,9 +681,7 @@ namespace locks {
compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
}
- PlatformThread::unlockRWMutex(this->rwLock);
-
- return result;
+ return false;
}
/**
@@ -598,7 +704,7 @@ namespace locks {
/**
* Acquires in exclusive uninterruptible mode for thread already in queue
* Used by condition wait methods as well as acquire. We can access the
- * Node's data here as it can't be deleted on us because it is guaranteed
+ * Node's data here as it can't be altered on us because it is guaranteed
* to exist until this method returns.
*
* @param node
@@ -609,33 +715,27 @@ namespace locks {
* @return true if interrupted while waiting
*/
bool acquireQueued(Node* node, int arg) {
- bool failed = true;
try {
-
bool interrupted = false;
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- delete setHead(node);
- failed = false;
+ setHead(node);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
return interrupted;
}
- if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
- } catch(Exception& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
- throw ex;
+ } catch (Exception& ex) {
+ cancelAcquire(node);
+ throw;
}
-
- cancelAcquire(node);
- return true;
}
/**
@@ -644,26 +744,24 @@ namespace locks {
*/
void doAcquireInterruptibly(int arg) {
Node* node = addWaiter(Node::EXCLUSIVE);
- bool failed = true;
try {
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- delete setHead(node);
- failed = false;
+ setHead(node);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
return;
}
- if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
- throw InterruptedException();
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ throw InterruptedException(__FILE__, __LINE__, "Interrupted while waiting for lock.");
}
}
- } catch(InterruptedException& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
+ } catch (Exception& ex) {
+ cancelAcquire(node);
throw InterruptedException(ex);
}
}
@@ -678,13 +776,14 @@ namespace locks {
bool doAcquireNanos(int arg, long long nanosTimeout) {
long long lastTime = System::nanoTime();
Node* node = addWaiter(Node::EXCLUSIVE);
- bool failed = true;
try {
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- delete setHead(node);
- failed = false;
+ setHead(node);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
return true;
}
@@ -692,7 +791,7 @@ namespace locks {
break;
}
- if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
+ if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
LockSupport::parkNanos(nanosTimeout);
}
@@ -701,16 +800,13 @@ namespace locks {
lastTime = now;
if (Thread::interrupted()) {
- throw InterruptedException();
+ throw InterruptedException(__FILE__, __LINE__, "Interrupted while waiting for lock.");
}
}
- } catch(InterruptedException& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
- throw ex;
+ } catch (Exception& ex) {
+ cancelAcquire(node);
+ throw;
}
cancelAcquire(node);
@@ -723,7 +819,6 @@ namespace locks {
*/
void doAcquireShared(int arg) {
Node* node = addWaiter(&Node::SHARED);
- bool failed = true;
try {
bool interrupted = false;
for (;;) {
@@ -731,25 +826,25 @@ namespace locks {
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- delete setHeadAndPropagate(node, r);
+ setHeadAndPropagate(node, r);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
+
if (interrupted) {
selfInterrupt();
}
- failed = false;
return;
}
}
- if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
- } catch(Exception& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
- throw ex;
+ } catch (Exception& ex) {
+ cancelAcquire(node);
+ throw;
}
cancelAcquire(node);
@@ -761,28 +856,27 @@ namespace locks {
*/
void doAcquireSharedInterruptibly(int arg) {
Node* node = addWaiter(&Node::SHARED);
- bool failed = true;
try {
for (;;) {
Node* p = node->predecessor();
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- delete setHeadAndPropagate(node, r);
- failed = false;
+ setHeadAndPropagate(node, r);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
return;
}
}
- if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
- throw InterruptedException();
+
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ throw InterruptedException(__FILE__, __LINE__, "Interrupted while waiting for lock.");
}
}
- } catch(InterruptedException& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
- throw ex;
+ } catch (Exception& ex) {
+ cancelAcquire(node);
+ throw;
}
}
@@ -800,15 +894,16 @@ namespace locks {
long long lastTime = System::nanoTime();
Node* node = addWaiter(&Node::SHARED);
- bool failed = true;
try {
for (;;) {
Node* p = node->predecessor();
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- delete setHeadAndPropagate(node, r);
- failed = false;
+ setHeadAndPropagate(node, r);
+ p->waitStatus = Node::CANCELLED;
+ p->next = NULL;
+ nodePool.returnNode(p);
return true;
}
}
@@ -816,7 +911,7 @@ namespace locks {
break;
}
- if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
+ if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
LockSupport::parkNanos(nanosTimeout);
}
@@ -825,15 +920,12 @@ namespace locks {
lastTime = now;
if (Thread::interrupted()) {
- throw InterruptedException();
+ throw InterruptedException(__FILE__, __LINE__, "Interrupted while waiting for lock.");
}
}
- } catch(InterruptedException& ex) {
- if (failed) {
- cancelAcquire(node);
- }
-
- throw ex;
+ } catch (Exception& ex) {
+ cancelAcquire(node);
+ throw;
}
cancelAcquire(node);
@@ -847,8 +939,6 @@ namespace locks {
// first node. If not, we continue on, safely traversing from tail
// back to head to find first, guaranteeing termination.
- PlatformThread::readerLockMutex(this->rwLock);
-
Node* t = tail.get();
Thread* firstThread = NULL;
while (t != NULL && t != head.get()) {
@@ -859,8 +949,6 @@ namespace locks {
t = t->prev;
}
- PlatformThread::unlockRWMutex(this->rwLock);
-
return firstThread;
}
@@ -898,7 +986,6 @@ namespace locks {
bool findNodeFromTail(Node* node) {
bool found = false;
- PlatformThread::readerLockMutex(this->rwLock);
Node* t = tail.get();
for (;;) {
@@ -914,7 +1001,6 @@ namespace locks {
t = t->prev;
}
- PlatformThread::unlockRWMutex(this->rwLock);
return found;
}
@@ -940,10 +1026,6 @@ namespace locks {
// woken up either by cancel or by acquiring the lock.
enqueue(node);
- // Since we need to write to our predecessor we must lock so that
- // it doesn't leave the Queue before we are done.
- PlatformThread::readerLockMutex(this->rwLock);
-
// Splice onto queue and try to set waitStatus of predecessor to
// indicate that thread is (probably) waiting. If canceled or
// attempt to set waitStatus fails, wake up to resync (in which
@@ -954,8 +1036,6 @@ namespace locks {
LockSupport::unpark((Thread*)node->thread);
}
- PlatformThread::unlockRWMutex(this->rwLock);
-
return true;
}
@@ -1001,23 +1081,16 @@ namespace locks {
* @return previous sync state
*/
int fullyRelease(Node* node) {
- bool failed = true;
try {
int savedState = parent->getState();
if (parent->release(savedState)) {
- failed = false;
return savedState;
} else {
throw IllegalMonitorStateException();
}
} catch(IllegalMonitorStateException& ex) {
- if (failed) {
- node->waitStatus = Node::CANCELLED;
- // Enqueue it even though canceled so that it gets deleted
- enqueue(node);
- }
-
- throw ex;
+ node->waitStatus = Node::CANCELLED;
+ throw;
}
}
@@ -1030,6 +1103,9 @@ namespace locks {
static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
return Atomics::compareAndSet32(&node->waitStatus, expect, update);
}
+ static bool compareAndSetNext(Node* node, Node* expect, Node* update) {
+ return Atomics::compareAndSet((volatile void **)(&node->next), (void*)expect, (void*)update);
+ }
};
/**
@@ -1065,6 +1141,10 @@ namespace locks {
DefaultConditionObject(SynchronizerState* impl) :
ConditionObject(), impl(impl), head(NULL), tail(NULL) {}
virtual ~DefaultConditionObject() {
+ try {
+ unlinkCancelledWaiters();
+ }
+ DECAF_CATCHALL_NOTHROW()
}
virtual void await() {
@@ -1088,9 +1168,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter != NULL && interruptMode == 0) {
- // clean up if canceled but only if we own the lock otherwise another
- // thread could already be changing the list.
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -1115,7 +1193,7 @@ namespace locks {
}
}
- virtual long long awaitNanos( long long nanosTimeout ) {
+ virtual long long awaitNanos(long long nanosTimeout) {
if (Thread::interrupted()) {
throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
}
@@ -1142,9 +1220,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter != NULL && interruptMode == 0) {
- // clean up if canceled but only if we own the lock otherwise another
- // thread could already be changing the list.
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -1154,7 +1230,7 @@ namespace locks {
return nanosTimeout - (System::nanoTime() - lastTime);
}
- virtual bool await( long long time, const TimeUnit& unit ) {
+ virtual bool await(long long time, const TimeUnit& unit) {
long long nanosTimeout = unit.toNanos(time);
if (Thread::interrupted()) {
throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
@@ -1184,9 +1260,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter != NULL && interruptMode == 0) {
- // clean up if canceled but only if we own the lock otherwise another
- // thread could already be changing the list.
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -1220,9 +1294,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter != NULL && interruptMode == 0) {
- // clean up if canceled but only if we own the lock otherwise another
- // thread could already be changing the list.
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -1265,17 +1337,12 @@ namespace locks {
throw IllegalMonitorStateException();
}
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
- PlatformThread::unlockRWMutex(this->impl->rwLock);
return true;
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return false;
}
@@ -1285,16 +1352,12 @@ namespace locks {
}
int n = 0;
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
++n;
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return n;
}
@@ -1304,8 +1367,6 @@ namespace locks {
}
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
Thread* t = (Thread*)w->thread;
@@ -1315,8 +1376,6 @@ namespace locks {
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return list;
}
@@ -1338,7 +1397,7 @@ namespace locks {
unlinkCancelledWaiters();
t = tail;
}
- Node* node = new Node(Thread::currentThread(), Node::CONDITION);
+ Node* node = impl->nodePool.takeNode(Thread::currentThread(), Node::CONDITION);
if (t == NULL) {
head = node;
} else {
@@ -1398,9 +1457,6 @@ namespace locks {
*/
void unlinkCancelledWaiters() {
- // Prevent the parent from deleting nodes while we clean up.
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
Node* t = head;
Node* trail = NULL;
while (t != NULL) {
@@ -1418,13 +1474,13 @@ namespace locks {
tail = trail;
}
+ impl->nodePool.returnNode(t);
+
} else {
trail = t;
}
t = next;
}
-
- PlatformThread::unlockRWMutex(this->impl->rwLock);
}
/**
@@ -1469,7 +1525,7 @@ AbstractQueuedSynchronizer::AbstractQueu
////////////////////////////////////////////////////////////////////////////////
AbstractQueuedSynchronizer::~AbstractQueuedSynchronizer() {
- try{
+ try {
delete this->impl;
}
DECAF_CATCHALL_NOTHROW()
@@ -1629,17 +1685,12 @@ bool AbstractQueuedSynchronizer::isQueue
throw NullPointerException(__FILE__, __LINE__, "Passed in thread was NULL");
}
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->thread == thread) {
- PlatformThread::unlockRWMutex(this->impl->rwLock);
return true;
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return false;
}
@@ -1647,16 +1698,12 @@ bool AbstractQueuedSynchronizer::isQueue
int AbstractQueuedSynchronizer::getQueueLength() const {
int n = 0;
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->thread != NULL) {
++n;
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return n;
}
@@ -1664,8 +1711,6 @@ int AbstractQueuedSynchronizer::getQueue
Collection<Thread*>* AbstractQueuedSynchronizer::getQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
Thread* t = (Thread*)p->thread;
if (t != NULL) {
@@ -1673,8 +1718,6 @@ Collection<Thread*>* AbstractQueuedSynch
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return list;
}
@@ -1682,8 +1725,6 @@ Collection<Thread*>* AbstractQueuedSynch
Collection<Thread*>* AbstractQueuedSynchronizer::getExclusiveQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (!p->isShared()) {
Thread* t = (Thread*)p->thread;
@@ -1693,8 +1734,6 @@ Collection<Thread*>* AbstractQueuedSynch
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return list;
}
@@ -1702,8 +1741,6 @@ Collection<Thread*>* AbstractQueuedSynch
Collection<Thread*>* AbstractQueuedSynchronizer::getSharedQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->isShared()) {
Thread* t = (Thread*)p->thread;
@@ -1713,8 +1750,6 @@ Collection<Thread*>* AbstractQueuedSynch
}
}
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return list;
}
@@ -1760,8 +1795,6 @@ bool AbstractQueuedSynchronizer::hasQueu
bool result = false;
- PlatformThread::readerLockMutex(this->impl->rwLock);
-
// The correctness of this depends on head being initialized
// before tail and on head->next being accurate if the current
// thread is first in queue.
@@ -1771,7 +1804,5 @@ bool AbstractQueuedSynchronizer::hasQueu
result = h != t && ((s = h->next) == NULL || s->thread != Thread::currentThread());
- PlatformThread::unlockRWMutex(this->impl->rwLock);
-
return result;
}