You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/06 16:09:47 UTC
svn commit: r1369852 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Author: tabish
Date: Mon Aug 6 14:09:47 2012
New Revision: 1369852
URL: http://svn.apache.org/viewvc?rev=1369852&view=rev
Log:
prevent races on node cancellation from overlapping node cleanup.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1369852&r1=1369851&r2=1369852&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Mon Aug 6 14:09:47 2012
@@ -299,32 +299,55 @@ namespace locks {
}
/**
- * Enqueue of a Node is Atomic with respect to the end of the list, so no
- * locking needs to occur here. If the head and tail have not been allocated
- * we just need to account for contention of two or more enqueues on the
- * addition of the new head.
+ * Enqueue of a Node is Atomic with respect to the end of the list, if the list
+ * is empty so no lock needs to occur. If the head and tail were previously set
+ * then we have to account for contention on the list by two or more enqueues.
*
* @param node
* The new node to add.
+ *
+ * @returns the predecessor of the newly inserted node.
*/
Node* enqueue(Node* node) {
- for (;;) {
- Node* t = tail.get();
- if (t == NULL) { // Must initialize
- Node* newHead = new Node();
- if (compareAndSetHead(newHead)) {
- tail.set(head.get());
- } else {
- delete newHead;
- }
- } else {
- node->prev = t;
- if (compareAndSetTail(t, node)) {
- t->next = node;
- return t;
- }
- }
- }
+
+ Node* pred = NULL;
+
+ PlatformThread::writerLockMutex(rwLock);
+
+ pred = tail.get();
+ if (pred == NULL) { // Must initialize
+ pred = new Node();
+ head.set(pred);
+ tail.set(pred);
+ }
+
+ node->prev = pred;
+ pred->next = node;
+ tail.set(node);
+
+ PlatformThread::unlockRWMutex(rwLock);
+
+ return pred;
+
+// for (;;) {
+// Node* t = tail.get();
+// if (t == NULL) { // Must initialize
+// Node* newHead = new Node();
+// if (compareAndSetHead(newHead)) {
+// tail.set(head.get());
+// } else {
+// delete newHead;
+// }
+// } else {
+// node->prev = t;
+// if (compareAndSetTail(t, node)) {
+// t->next = node;
+// return t;
+// }
+// }
+// }
+
+ return NULL;
}
/**
@@ -332,22 +355,28 @@ namespace locks {
* can't get the fast append done we will enter into the longer looping
* enqueue method.
*
- * @param node
- * The new Node to add.
+ * @param mode
+ * Node::EXCLUSIVE for exclusive, Node::SHARED for shared
+ *
+ * @return the newly added Node
*/
Node* addWaiter(Node* mode) {
Node* node = new Node(Thread::currentThread(), mode);
- Node* pred = tail.get();
- if (pred != NULL) {
- node->prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred->next = node;
- return node;
- }
- }
-
enqueue(node);
return node;
+
+// Node* node = new Node(Thread::currentThread(), mode);
+// Node* pred = tail.get();
+// if (pred != NULL) {
+// node->prev = pred;
+// if (compareAndSetTail(pred, node)) {
+// pred->next = node;
+// return node;
+// }
+// }
+//
+// enqueue(node);
+// return node;
}
/**
@@ -358,44 +387,55 @@ namespace locks {
* @param node
* The Node that is to become the new Head of the queue.
*/
- void setHead(Node* node) {
+ Node* setHead(Node* node) {
+ Node* oldHead = NULL;
+ PlatformThread::writerLockMutex(rwLock);
+ oldHead = head.get();
head.set(node);
node->thread = NULL;
node->prev = NULL;
+ PlatformThread::unlockRWMutex(this->rwLock);
+ return oldHead;
+
+// head.set(node);
+// node->thread = NULL;
+// node->prev = NULL;
}
+ /**
+ * Wakes up node's successor, if one exists.
+ *
+ * @param node
+ * the node whose successor will be unparked.
+ */
void unparkSuccessor(Node* node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
+
+ // If status is negative (i.e., possibly needing signal) try to clear
+ // in anticipation of signalling. It is OK if this fails or if status
+ // is changed by waiting thread.
int ws = node->waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
- // We need to lock to prevent cancellation of a Node from
- // altering the list as we iterate and check Node status fields.
+ // We need to lock to prevent cancellation of a Node from altering
+ // the list as we iterate and check Node status fields.
PlatformThread::readerLockMutex(this->rwLock);
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if canceled or apparently NULL,
- * traverse backwards from tail to find the actual
- * non-canceled successor.
- */
- Node* s = node->next;
- if (s == NULL || s->waitStatus > 0) {
- s = NULL;
+ // Thread to un-park is held in successor, which is normally just the
+ // next node. But if canceled or apparently NULL, traverse backwards
+ // from tail to find the actual non-canceled successor.
+ Node* successor = node->next;
+ if (successor == NULL || successor->waitStatus > 0) {
+ successor = NULL;
for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
if (t->waitStatus <= 0) {
- s = t;
+ successor = t;
}
}
- if (s != NULL) {
- LockSupport::unpark((Thread*)s->thread);
+ if (successor != NULL) {
+ LockSupport::unpark((Thread*)successor->thread);
}
PlatformThread::unlockRWMutex(this->rwLock);
@@ -412,17 +452,13 @@ namespace locks {
// different thread does its release shared.
PlatformThread::readerLockMutex(this->rwLock);
- /*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
- */
+ // Ensure that a release propagates, even if there are other in-progress
+ // acquires/releases. This proceeds in the usual way of trying to
+ // unparkSuccessor of head if it needs signal. But if it does not,
+ // status is set to PROPAGATE to ensure that upon release, propagation
+ // continues. Additionally, we must loop in case a new node is added
+ // while we are doing this. Also, unlike other uses of unparkSuccessor,
+ // we need to know if CAS to reset status fails, if so rechecking.
for (;;) {
Node* h = head.get();
if (h != NULL && h != tail.get()) {
@@ -445,47 +481,51 @@ namespace locks {
}
/**
- * Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if either propagate > 0 or
- * PROPAGATE status was set.
+ * Sets head of queue, and checks if successor may be waiting in shared mode,
+ * if so propagating if either propagate > 0 or PROPAGATE status was set.
*
- * @param node the node
- * @param propagate the return value from a tryAcquireShared
+ * @param node
+ * The node that will become head
+ * @param propagate
+ * The return value from a tryAcquireShared
+ *
+ * @return the Node that was the head.
*/
- void setHeadAndPropagate(Node* node, int propagate) {
+ Node* setHeadAndPropagate(Node* node, int propagate) {
+
+ Node* head = setHead(node); // Record old head for check below
// Here we have to read lock because head could change when a
// different thread does its release shared.
PlatformThread::readerLockMutex(this->rwLock);
- Node* h = head.get(); // Record old head for check below
- setHead(node);
-
- /*
- * Try to signal next queued node if:
- * Propagation was indicated by caller, or was recorded (as h.waitStatus)
- * by a previous operation (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.) and the next node is waiting
- * in shared mode, or we don't know, because it appears NULL.
- *
- * The conservatism in both of these checks may cause unnecessary wake-ups,
- * but only when there are multiple racing acquires/releases, so most need
- * signals now or soon anyway.
- */
- if (propagate > 0 || h == NULL || h->waitStatus < 0) {
- Node* s = node->next;
- if (s == NULL || s->isShared()) {
+ // Try to signal next queued node if:
+ // Propagation was indicated by caller, or was recorded (as head->waitStatus)
+ // by a previous operation (note: this uses sign-check of waitStatus because
+ // PROPAGATE status may transition to SIGNAL.) and the next node is waiting
+ // in shared mode, or we don't know, because it appears NULL.
+ //
+ // The conservatism in both of these checks may cause unnecessary wake-ups,
+ // but only when there are multiple racing acquires/releases, so most need
+ // signals now or soon anyway.
+ if (propagate > 0 || head == NULL || head->waitStatus < 0) {
+ Node* successor = node->next;
+ if (successor == NULL || successor->isShared()) {
doReleaseShared();
}
}
PlatformThread::unlockRWMutex(this->rwLock);
+
+ return head;
}
/**
- * Cancels an ongoing attempt to acquire.
+ * Cancels an ongoing attempt to acquire. The cancelled Node needs to be
+ * deleted but we need to unlink it from the list before we can do so.
*
- * @param node the node
+ * @param node
+ * The node that was attempting to acquire, will be delted here.
*/
void cancelAcquire(Node* node) {
@@ -496,56 +536,93 @@ namespace locks {
node->thread = NULL;
- // Can use unconditional write instead of CAS here.
- // After this atomic step, other Nodes can skip past us.
- // Before, we are free of interference from other threads.
+ // Can use unconditional write instead of CAS here. After this atomic
+ // step, other Nodes can skip past us. Before, we are free of interference
+ // from other threads.
node->waitStatus = Node::CANCELLED;
- // If we are the tail, remove ourselves.
- if (node == tail.get() && compareAndSetTail(node, node->prev)) {
- // Attempt to set next on tail, this can fail if another thread can in
- // and replaced the old tail but that's ok since that means next is up
- // to date in that case.
- Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
- delete node;
- } else {
- // If successor needs signal, try to set pred's next-link
- // so it will get one. Otherwise wake it up to propagate.
- int ws;
-
- PlatformThread::writerLockMutex(this->rwLock);
-
- // Did we become the tail.
- if (node == tail.get() && compareAndSetTail(node, node->prev)) {
- Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
- }
+ // If successor needs signal, try to set pred's next-link so it will
+ // get one. Otherwise wake it up to propagate.
+ int ws = 0;
- if (node->prev != head.get() &&
- ((ws = node->prev->waitStatus) == Node::SIGNAL ||
- (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
- node->prev->thread != NULL) {
+ PlatformThread::writerLockMutex(this->rwLock);
- PlatformThread::unlockRWMutex(this->rwLock);
- } else {
- PlatformThread::unlockRWMutex(this->rwLock);
- unparkSuccessor(node);
- }
+ if (node == tail.get()) {
+ tail.set(node->prev);
+ node->prev->next = NULL;
+ } else {
+ node->prev->next = node->next;
+ node->next->prev = node->prev;
+ }
+
+ if (node->prev != head.get() &&
+ ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
+ node->prev->thread != NULL) {
- delete node;
+ PlatformThread::unlockRWMutex(this->rwLock);
+ } else {
+ PlatformThread::unlockRWMutex(this->rwLock);
+ unparkSuccessor(node);
}
+
+ delete node;
+
+// node->thread = NULL;
+//
+// // Can use unconditional write instead of CAS here. After this atomic
+// // step, other Nodes can skip past us. Before, we are free of interference
+// // from other threads.
+// node->waitStatus = Node::CANCELLED;
+//
+// // If we are the tail, remove ourselves.
+// if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+// // Attempt to set next on tail, this can fail if another thread can in
+// // and replaced the old tail but that's ok since that means next is up
+// // to date in that case.
+// Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
+// delete node;
+// } else {
+// // If successor needs signal, try to set pred's next-link
+// // so it will get one. Otherwise wake it up to propagate.
+// int ws;
+//
+// PlatformThread::writerLockMutex(this->rwLock);
+//
+// // Did we become the tail.
+// if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+// Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
+// } else {
+// node->prev->next = node->next;
+// node->next->prev = node->prev;
+// }
+//
+// if (node->prev != head.get() &&
+// ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+// (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
+// node->prev->thread != NULL) {
+//
+// PlatformThread::unlockRWMutex(this->rwLock);
+// } else {
+// PlatformThread::unlockRWMutex(this->rwLock);
+// unparkSuccessor(node);
+// }
+//
+// delete node;
+// }
}
/**
- * Checks and updates status for a node that failed to acquire.
- * Returns true if thread should block. This is the main signal
- * control in all acquire loops. Requires that pred == node.prev
+ * Checks and updates status for a node that failed to acquire. Returns true
+ * if thread should block. This is the main signal control in all acquire loops.
+ * Requires that pred == node->prev
*
- * @param pred node's predecessor holding status
- * @param node the node
- * @return {@code true} if thread should block
+ * @param pred
+ * The ode's predecessor holding status
+ * @param node
+ * The node whose acquire attempt failed.
+ *
+ * @return true if thread should block.
*/
bool shouldParkAfterFailedAcquire(Node* node) {
@@ -556,23 +633,16 @@ namespace locks {
int ws = node->prev->waitStatus;
if (ws == Node::SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
+ // This node has already set status asking a release to signal
+ // it, so it can safely park.
result = true;
if (ws > 0) {
- /*
- * Predecessor was canceled. Skip over predecessors and
- * indicate retry.
- */
+ // Predecessor was canceled. Skip over predecessors and indicate retry.
result = false;
} else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
+ // waitStatus must be 0 or PROPAGATE. Indicate that we need a
+ // signal, but don't park yet. Caller will need to retry to
+ // make sure it cannot acquire before parking.
compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
}
@@ -591,7 +661,7 @@ namespace locks {
/**
* Convenience method to park and then check if interrupted
*
- * @return {@code true} if interrupted
+ * @return true if interrupted.
*/
bool parkAndCheckInterrupt() const {
LockSupport::park();
@@ -599,15 +669,17 @@ namespace locks {
}
/**
- * Acquires in exclusive uninterruptible mode for thread already in
- * queue. Used by condition wait methods as well as acquire.
+ * Acquires in exclusive uninterruptible mode for thread already in queue
+ * Used by condition wait methods as well as acquire. We can access the
+ * Node's data here as it can't be deleted on us because it is guaranteed
+ * to exist until this method returns.
*
* @param node
* The node.
* @param arg
* The value passed to acquire.
*
- * @return {@code true} if interrupted while waiting
+ * @return true if interrupted while waiting
*/
bool acquireQueued(Node* node, int arg) {
bool failed = true;
@@ -617,8 +689,7 @@ namespace locks {
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- setHead(node);
- delete p;
+ delete setHead(node);
failed = false;
return interrupted;
}
@@ -651,8 +722,7 @@ namespace locks {
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- setHead(node);
- delete p;
+ delete setHead(node);
failed = false;
return;
}
@@ -686,8 +756,7 @@ namespace locks {
for (;;) {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
- setHead(node);
- delete p;
+ delete setHead(node);
failed = false;
return true;
}
@@ -735,8 +804,7 @@ namespace locks {
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- setHeadAndPropagate(node, r);
- delete p;
+ delete setHeadAndPropagate(node, r);
if (interrupted) {
selfInterrupt();
}
@@ -773,8 +841,7 @@ namespace locks {
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- setHeadAndPropagate(node, r);
- delete p;
+ delete setHeadAndPropagate(node, r);
failed = false;
return;
}
@@ -795,9 +862,12 @@ namespace locks {
/**
* Acquires in shared timed mode.
*
- * @param arg the acquire argument
- * @param nanosTimeout max wait time
- * @return {@code true} if acquired
+ * @param arg
+ * The acquire argument (implementation specific).
+ * @param nanosTimeout
+ * Max wait time for the Aquire in nanos
+ *
+ * @return true if acquired
*/
bool doAcquireSharedNanos(int arg, long long nanosTimeout) {
@@ -810,8 +880,7 @@ namespace locks {
if (p == head.get()) {
int r = parent->tryAcquireShared(arg);
if (r >= 0) {
- setHeadAndPropagate(node, r);
- delete p;
+ delete setHeadAndPropagate(node, r);
failed = false;
return true;
}
@@ -846,22 +915,25 @@ namespace locks {
Thread* fullGetFirstQueuedThread() {
- /*
- * Head's next field might not have been set yet, or may have
- * been unset after setHead. So we must check to see if tail
- * is actually first node. If not, we continue on, safely
- * traversing from tail back to head to find first,
- * guaranteeing termination.
- */
+ // Head's next field might not have been set yet, or may have been
+ // unset after setHead. So we must check to see if tail is actually
+ // first node. If not, we continue on, safely traversing from tail
+ // back to head to find first, guaranteeing termination.
+
+ PlatformThread::readerLockMutex(this->rwLock);
Node* t = tail.get();
Thread* firstThread = NULL;
while (t != NULL && t != head.get()) {
Thread* tt = (Thread*)t->thread;
- if (tt != NULL)
+ if (tt != NULL) {
firstThread = tt;
+ }
t = t->prev;
}
+
+ PlatformThread::unlockRWMutex(this->rwLock);
+
return firstThread;
}
@@ -882,44 +954,46 @@ namespace locks {
return true;
}
- /*
- * node->prev can be non-NULL, but not yet on queue because
- * the CAS to place it on queue can fail. So we have to
- * traverse from tail to make sure it actually made it. It
- * will always be near the tail in calls to this method, and
- * unless the CAS failed (which is unlikely), it will be
- * there, so we hardly ever traverse much.
- */
+ // node->prev can be non-NULL, but not yet on queue because the CAS
+ // to place it on queue can fail. So we have to traverse from tail to
+ // make sure it actually made it. It will always be near the tail in
+ // calls to this method, and unless the CAS failed (which is unlikely),
+ // it will be there, so we hardly ever traverse much.
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
+ *
* @return true if present
*/
bool findNodeFromTail(Node* node) {
+ bool found = false;
+
+ PlatformThread::readerLockMutex(this->rwLock);
Node* t = tail.get();
for (;;) {
if (t == node) {
- return true;
+ found = true;
+ break;
}
if (t == NULL) {
- return false;
+ break;
}
t = t->prev;
}
- return false;
+ PlatformThread::unlockRWMutex(this->rwLock);
+ return found;
}
/**
* Transfers a node from a condition queue onto sync queue.
- * Returns true if successful. If the node was canceled this
- * method will delete it before returning false.
+ * Returns true if successful.
*
* @param node
* The node to transfer to the wait Queue
@@ -928,24 +1002,26 @@ namespace locks {
* canceled before signal and deleted).
*/
bool transferForSignal(Node* node) {
- /*
- * If cannot change waitStatus, the node has been canceled.
- */
+
+ // If we cannot change waitStatus, the node has been canceled.
if (!compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
return false;
}
+ // Get the Node onto the list, once there we need to update its
+ // predecessor to indicate it should signal this Node once it is
+ // woken up either by cancel or by acquiring the lock.
+ enqueue(node);
+
// Since we need to write to our predecessor we must lock so that
// it doesn't leave the Queue before we are done.
- PlatformThread::writerLockMutex(this->rwLock);
+ PlatformThread::readerLockMutex(this->rwLock);
- /*
- * Splice onto queue and try to set waitStatus of predecessor to
- * indicate that thread is (probably) waiting. If canceled or
- * attempt to set waitStatus fails, wake up to resync (in which
- * case the waitStatus can be transiently and harmlessly wrong).
- */
- Node* p = enqueue(node);
+ // Splice onto queue and try to set waitStatus of predecessor to
+ // indicate that thread is (probably) waiting. If canceled or
+ // attempt to set waitStatus fails, wake up to resync (in which
+ // case the waitStatus can be transiently and harmlessly wrong).
+ Node* p = node->prev;
int ws = p->waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node::SIGNAL)) {
LockSupport::unpark((Thread*)node->thread);
@@ -1027,7 +1103,6 @@ namespace locks {
static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
return Atomics::compareAndSet32(&node->waitStatus, expect, update);
}
-
};
/**
@@ -1262,11 +1337,18 @@ namespace locks {
if (!impl->isHeldExclusively()) {
throw IllegalMonitorStateException();
}
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
return true;
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return false;
}
@@ -1275,11 +1357,17 @@ namespace locks {
throw IllegalMonitorStateException();
}
int n = 0;
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
++n;
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return n;
}
@@ -1288,6 +1376,9 @@ namespace locks {
throw IllegalMonitorStateException();
}
ArrayList<Thread*>* list = new ArrayList<Thread*>();
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
for (Node* w = head; w != NULL; w = w->nextWaiter) {
if (w->waitStatus == Node::CONDITION) {
Thread* t = (Thread*)w->thread;
@@ -1296,6 +1387,9 @@ namespace locks {
}
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return list;
}
@@ -1376,6 +1470,10 @@ namespace locks {
* without requiring many re-traversals during cancellation storms.
*/
void unlinkCancelledWaiters() {
+
+ // Prevent the parent from deleting nodes while we clean up.
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
Node* t = head;
Node* trail = NULL;
while (t != NULL) {
@@ -1398,6 +1496,8 @@ namespace locks {
}
t = next;
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
}
/**