You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/20 17:50:09 UTC
[pulsar] branch master updated: Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 159ead34d3f Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)
159ead34d3f is described below
commit 159ead34d3fc5b456e90b66573495b7748f2bf7d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 20 10:50:02 2022 -0700
Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)
---
.../util/collections/GrowableArrayBlockingQueue.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index fd32df75dc5..d4afe6ecf63 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
/**
@@ -42,7 +43,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
private final ReentrantLock headLock = new ReentrantLock();
private final PaddedInt headIndex = new PaddedInt();
private final PaddedInt tailIndex = new PaddedInt();
- private final ReentrantLock tailLock = new ReentrantLock();
+ private final StampedLock tailLock = new StampedLock();
private final Condition isNotEmpty = headLock.newCondition();
private T[] data;
@@ -126,7 +127,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
@Override
public void put(T e) {
- tailLock.lock();
+ long stamp = tailLock.writeLock();
boolean wasEmpty = false;
@@ -141,7 +142,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
wasEmpty = true;
}
} finally {
- tailLock.unlock();
+ tailLock.unlockWrite(stamp);
}
if (wasEmpty) {
@@ -278,7 +279,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
@Override
public boolean remove(Object o) {
- tailLock.lock();
+ long stamp = tailLock.writeLock();
headLock.lock();
try {
@@ -297,7 +298,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
}
} finally {
headLock.unlock();
- tailLock.unlock();
+ tailLock.unlockWrite(stamp);
}
return false;
@@ -347,7 +348,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
@Override
public void forEach(Consumer<? super T> action) {
- tailLock.lock();
+ long stamp = tailLock.writeLock();
headLock.lock();
try {
@@ -364,7 +365,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
} finally {
headLock.unlock();
- tailLock.unlock();
+ tailLock.unlockWrite(stamp);
}
}
@@ -372,7 +373,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
public String toString() {
StringBuilder sb = new StringBuilder();
- tailLock.lock();
+ long stamp = tailLock.writeLock();
headLock.lock();
try {
@@ -395,7 +396,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
sb.append(']');
} finally {
headLock.unlock();
- tailLock.unlock();
+ tailLock.unlockWrite(stamp);
}
return sb.toString();
}