You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:26 UTC

[pulsar] 15/15: [improve][broker] Support shrink for ConcurrentSortedLongPairSet (#15354)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9e8c3242f70947d7137f858a8e28ff18df5dffed
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 20:30:30 2022 +0800

    [improve][broker] Support shrink for ConcurrentSortedLongPairSet  (#15354)
    
    (cherry picked from commit 24d4d76bb9e39010bae3f4cbd8ddba6422570b4e)
---
 .../persistent/MessageRedeliveryController.java    |  2 +-
 .../util/collections/ConcurrentLongPairSet.java    | 53 ++++++++++++----------
 .../collections/ConcurrentSortedLongPairSet.java   | 27 +++++++++--
 .../common/util/collections/LongPairSet.java       |  7 +++
 .../ConcurrentSortedLongPairSetTest.java           | 43 ++++++++++++++++++
 5 files changed, 105 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index c7f96fffcef..46fa1b2b050 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -36,7 +36,7 @@ public class MessageRedeliveryController {
     private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
 
     public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
-        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
         this.hashesToBeBlocked = allowOutOfOrderDelivery
                 ? null
                 : ConcurrentLongLongPairHashMap
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 66ecaee4bfa..7b5e75813fa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -175,6 +175,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
         return size;
     }
 
+    @Override
     public long capacity() {
         long capacity = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -447,20 +448,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (autoShrink && size < resizeThresholdBelow) {
-                    try {
-                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
-                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
-                        if (newCapacity < capacity && newResizeThresholdUp > size) {
-                            // shrink the hashmap
-                            rehash(newCapacity);
-                        }
-                    } finally {
-                        unlockWrite(stamp);
-                    }
-                } else {
-                    unlockWrite(stamp);
-                }
+                tryShrinkThenUnlock(stamp);
             }
         }
 
@@ -469,23 +457,42 @@ public class ConcurrentLongPairSet implements LongPairSet {
             int removedItems = 0;
 
             // Go through all the buckets for this section
-            for (int bucket = 0; bucket < table.length; bucket += 2) {
-                long storedItem1 = table[bucket];
-                long storedItem2 = table[bucket + 1];
-
-                if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
-                    if (filter.test(storedItem1, storedItem2)) {
-                        long h = hash(storedItem1, storedItem2);
-                        if (remove(storedItem1, storedItem2, (int) h)) {
+            long stamp = writeLock();
+            try {
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedItem1 = table[bucket];
+                    long storedItem2 = table[bucket + 1];
+                    if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
+                        if (filter.test(storedItem1, storedItem2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+                            cleanBucket(bucket);
                             removedItems++;
                         }
                     }
                 }
+            } finally {
+                tryShrinkThenUnlock(stamp);
             }
-
             return removedItems;
         }
 
+        private void tryShrinkThenUnlock(long stamp) {
+            if (autoShrink && size < resizeThresholdBelow) {
+                try {
+                    int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                    int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                    if (newCapacity < capacity && newResizeThresholdUp > size) {
+                        // shrink the hashmap
+                        rehash(newCapacity);
+                    }
+                } finally {
+                    unlockWrite(stamp);
+                }
+            } else {
+                unlockWrite(stamp);
+            }
+        }
+
         private void cleanBucket(int bucket) {
             int nextInArray = (bucket + 2) & (table.length - 1);
             if (table[nextInArray] == EmptyItem) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index e4cb668fc92..06efd0490d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -48,14 +48,15 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC
 public class ConcurrentSortedLongPairSet implements LongPairSet {
 
     protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets = new ConcurrentSkipListMap<>();
-    private int expectedItems;
-    private int concurrencyLevel;
+    private final int expectedItems;
+    private final int concurrencyLevel;
     /**
      * If {@link #longPairSets} adds and removes the item-set frequently then it allocates and removes
      * {@link ConcurrentLongPairSet} for the same item multiple times which can lead to gc-puases. To avoid such
      * situation, avoid removing empty LogPairSet until it reaches max limit.
      */
-    private int maxAllowedSetOnRemove;
+    private final int maxAllowedSetOnRemove;
+    private final boolean autoShrink;
     private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10;
 
     public ConcurrentSortedLongPairSet() {
@@ -70,10 +71,20 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
         this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
     }
 
+    public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, boolean autoShrink) {
+        this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink);
+    }
+
     public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove) {
+        this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false);
+    }
+
+    public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove,
+                                       boolean autoShrink) {
         this.expectedItems = expectedItems;
         this.concurrencyLevel = concurrencyLevel;
         this.maxAllowedSetOnRemove = maxAllowedSetOnRemove;
+        this.autoShrink = autoShrink;
     }
 
     @Override
@@ -82,6 +93,7 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
                 (key) -> ConcurrentLongPairSet.newBuilder()
                         .expectedItems(expectedItems)
                         .concurrencyLevel(concurrencyLevel)
+                        .autoShrink(autoShrink)
                         .build());
         return messagesToReplay.add(item1, item2);
     }
@@ -194,6 +206,15 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
         return size.get();
     }
 
+    @Override
+    public long capacity() {
+        AtomicLong capacity = new AtomicLong(0);
+        longPairSets.forEach((item1, longPairSet) -> {
+            capacity.getAndAdd(longPairSet.capacity());
+        });
+        return capacity.get();
+    }
+
     @Override
     public boolean contains(long item1, long item2) {
         ConcurrentLongPairSet longPairSet = longPairSets.get(item1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
index 32de7e4c232..f27b994f777 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
@@ -107,6 +107,13 @@ public interface LongPairSet {
      */
     long size();
 
+    /**
+     * Returns capacity of the set.
+     *
+     * @return
+     */
+    long capacity();
+
     /**
      * Checks if given (item1,item2) composite value exists into set.
      *
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
index fcb9884a795..62dfa21dc81 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
@@ -181,6 +182,20 @@ public class ConcurrentSortedLongPairSetTest {
         values = new ArrayList<>(set.items());
         values.sort(null);
         assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new LongPair(7, 7)));
+
+        set = new ConcurrentSortedLongPairSet(128, 2, true);
+        set.add(2, 2);
+        set.add(1, 3);
+        set.add(3, 1);
+        set.add(2, 1);
+        set.add(3, 2);
+        set.add(1, 2);
+        set.add(1, 1);
+        removeItems = set.removeIf((ledgerId, entryId) -> {
+            return ComparisonChain.start().compare(ledgerId, 1).compare(entryId, 3)
+                    .result() <= 0;
+        });
+        assertEquals(removeItems, 3);
     }
 
     @Test
@@ -245,4 +260,32 @@ public class ConcurrentSortedLongPairSetTest {
         set.add(1, 1);
         assertFalse(set.isEmpty());
     }
+
+    @Test
+    public void testShrink() {
+        LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true);
+        set.add(0, 0);
+        assertTrue(set.capacity() == 4);
+        set.add(0, 1);
+        assertTrue(set.capacity() == 4);
+        set.add(1, 1);
+        assertTrue(set.capacity() == 8);
+        set.add(1, 2);
+        assertTrue(set.capacity() == 8);
+        set.add(1, 3);
+        set.add(1, 4);
+        set.add(1, 5);
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 5);
+        // not shrink
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 4);
+        // the internal map does not keep shrinking at every remove() operation
+        assertTrue(set.capacity() == 12);
+        set.remove(1, 3);
+        set.remove(1, 2);
+        set.remove(1, 1);
+        // shrink
+        assertTrue(set.capacity() == 8);
+    }
 }