You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/20 17:50:09 UTC

[pulsar] branch master updated: Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 159ead34d3f Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)
159ead34d3f is described below

commit 159ead34d3fc5b456e90b66573495b7748f2bf7d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 20 10:50:02 2022 -0700

    Use non-reentrant lock for GrowableArrayBlockingQueue tail (#16125)
---
 .../util/collections/GrowableArrayBlockingQueue.java  | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index fd32df75dc5..d4afe6ecf63 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.StampedLock;
 import java.util.function.Consumer;
 
 /**
@@ -42,7 +43,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
     private final ReentrantLock headLock = new ReentrantLock();
     private final PaddedInt headIndex = new PaddedInt();
     private final PaddedInt tailIndex = new PaddedInt();
-    private final ReentrantLock tailLock = new ReentrantLock();
+    private final StampedLock tailLock = new StampedLock();
     private final Condition isNotEmpty = headLock.newCondition();
 
     private T[] data;
@@ -126,7 +127,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
 
     @Override
     public void put(T e) {
-        tailLock.lock();
+        long stamp = tailLock.writeLock();
 
         boolean wasEmpty = false;
 
@@ -141,7 +142,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
                 wasEmpty = true;
             }
         } finally {
-            tailLock.unlock();
+            tailLock.unlockWrite(stamp);
         }
 
         if (wasEmpty) {
@@ -278,7 +279,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
 
     @Override
     public boolean remove(Object o) {
-        tailLock.lock();
+        long stamp = tailLock.writeLock();
         headLock.lock();
 
         try {
@@ -297,7 +298,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
             }
         } finally {
             headLock.unlock();
-            tailLock.unlock();
+            tailLock.unlockWrite(stamp);
         }
 
         return false;
@@ -347,7 +348,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
 
     @Override
     public void forEach(Consumer<? super T> action) {
-        tailLock.lock();
+        long stamp = tailLock.writeLock();
         headLock.lock();
 
         try {
@@ -364,7 +365,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
 
         } finally {
             headLock.unlock();
-            tailLock.unlock();
+            tailLock.unlockWrite(stamp);
         }
     }
 
@@ -372,7 +373,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
     public String toString() {
         StringBuilder sb = new StringBuilder();
 
-        tailLock.lock();
+        long stamp = tailLock.writeLock();
         headLock.lock();
 
         try {
@@ -395,7 +396,7 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
             sb.append(']');
         } finally {
             headLock.unlock();
-            tailLock.unlock();
+            tailLock.unlockWrite(stamp);
         }
         return sb.toString();
     }