You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/27 13:37:05 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #15354: [improve][broker] Support shrink for ConcurrentSortedLongPairSet

Technoboy- opened a new pull request, #15354:
URL: https://github.com/apache/pulsar/pull/15354

   ### Motivation
   Sometimes the messages sent to consumers are delayed over 100ms and we find that the CPU is wasted on :
   ```
   "BookKeeperClientWorker-OrderedExecutor-4-0" #64 prio=5 os_prio=0 cpu=12748552.31ms elapsed=42037.79s tid=0x00007f7b4dc05720 nid=0xaf runnable  [0x00007f7a376f4000]
      java.lang.Thread.State: RUNNABLE
   	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.forEach(ConcurrentLongPairSet.java:157)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:136)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:128)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:113)
   	at org.apache.pulsar.broker.service.persistent.MessageRedeliveryController.getMessagesToReplayNow(MessageRedeliveryController.java:109)
   	at 
   ```
   
   Then we find there are many empty datas in the map from dump :
   <img width="1129" alt="image" src="https://user-images.githubusercontent.com/6297296/165530721-e2da535f-f2a7-4d13-839f-f7bf157a1b5d.png">
   
   ConcurrentSortedLongPairSet implements based on `ConcurrentLongPairSet`, and `ConcurrentLongPairSet` supports shrink, so support shrink for ConcurrentSortedLongPairSet to avoid iterator the empty data.
   
   ### Documentation
   
     
   - [x] `no-need-doc` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15354: [improve][broker] Support shrink for ConcurrentSortedLongPairSet

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15354:
URL: https://github.com/apache/pulsar/pull/15354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #15354: [improve][broker] Support shrink for ConcurrentSortedLongPairSet

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #15354:
URL: https://github.com/apache/pulsar/pull/15354#issuecomment-1111906744

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15354: [improve][broker] Support shrink for ConcurrentSortedLongPairSet

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15354:
URL: https://github.com/apache/pulsar/pull/15354#discussion_r860710232


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java:
##########
@@ -469,20 +470,35 @@ private int removeIf(LongPairPredicate filter) {
             int removedItems = 0;
 
             // Go through all the buckets for this section
-            for (int bucket = 0; bucket < table.length; bucket += 2) {
-                long storedItem1 = table[bucket];
-                long storedItem2 = table[bucket + 1];
-
-                if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
-                    if (filter.test(storedItem1, storedItem2)) {
-                        long h = hash(storedItem1, storedItem2);
-                        if (remove(storedItem1, storedItem2, (int) h)) {
+            long stamp = writeLock();
+            try {
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedItem1 = table[bucket];
+                    long storedItem2 = table[bucket + 1];
+                    if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
+                        if (filter.test(storedItem1, storedItem2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+                            cleanBucket(bucket);
                             removedItems++;
                         }
                     }
                 }
+            } finally {
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }

Review Comment:
   Could we reuse this block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15354: [improve][broker] Support shrink for ConcurrentSortedLongPairSet

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15354:
URL: https://github.com/apache/pulsar/pull/15354#discussion_r860764493


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java:
##########
@@ -469,20 +470,35 @@ private int removeIf(LongPairPredicate filter) {
             int removedItems = 0;
 
             // Go through all the buckets for this section
-            for (int bucket = 0; bucket < table.length; bucket += 2) {
-                long storedItem1 = table[bucket];
-                long storedItem2 = table[bucket + 1];
-
-                if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
-                    if (filter.test(storedItem1, storedItem2)) {
-                        long h = hash(storedItem1, storedItem2);
-                        if (remove(storedItem1, storedItem2, (int) h)) {
+            long stamp = writeLock();
+            try {
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedItem1 = table[bucket];
+                    long storedItem2 = table[bucket + 1];
+                    if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
+                        if (filter.test(storedItem1, storedItem2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+                            cleanBucket(bucket);
                             removedItems++;
                         }
                     }
                 }
+            } finally {
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }

Review Comment:
   ok, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org