You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:16 UTC

[pulsar] 05/15: Reduce unnecessary expansions for ConcurrentLong map and set (#14562)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c29b1d5fa171c889925e1a82ac3e6d4f8b359a5
Author: LinChen <15...@qq.com>
AuthorDate: Tue Mar 15 11:41:08 2022 +0800

    Reduce unnecessary expansions for ConcurrentLong map and set (#14562)
    
    (cherry picked from commit 8e7006f899bd2b9ed9482ab2ce1ee35233957d03)
---
 .../util/collections/ConcurrentLongHashMap.java    | 10 ++++++
 .../util/collections/ConcurrentLongPairSet.java    | 11 ++++---
 .../util/collections/ConcurrentOpenHashMap.java    | 19 ++++++++++++
 .../util/collections/ConcurrentOpenHashSet.java    | 18 +++++++++++
 .../collections/ConcurrentLongHashMapTest.java     | 19 ++++++++++++
 .../collections/ConcurrentLongPairSetTest.java     | 19 ++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 19 ++++++++++++
 .../collections/ConcurrentOpenHashSetTest.java     | 36 +++++++++++++++++-----
 8 files changed, 138 insertions(+), 13 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index a4779357a44..6f2794468c4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -451,6 +451,16 @@ public class ConcurrentLongHashMap<V> {
                             if (nextValueInArray == EmptyValue) {
                                 values[bucket] = (V) EmptyValue;
                                 --usedBuckets;
+
+                                // Cleanup all the buckets that were in `DeletedValue` state,
+                                // so that we can reduce unnecessary expansions
+                                int lastBucket = signSafeMod(bucket - 1, capacity);
+                                while (values[lastBucket] == DeletedValue) {
+                                    values[lastBucket] = (V) EmptyValue;
+                                    --usedBuckets;
+
+                                    lastBucket = signSafeMod(lastBucket - 1, capacity);
+                                }
                             } else {
                                 values[bucket] = (V) DeletedValue;
                             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index abbe11576a9..66ecaee4bfa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -493,14 +493,15 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 table[bucket + 1] = EmptyItem;
                 --usedBuckets;
 
-                // Cleanup all the buckets that were in `DeletedKey` state,
+                // Cleanup all the buckets that were in `DeletedItem` state,
                 // so that we can reduce unnecessary expansions
-                bucket = (bucket - 1) & (table.length - 1);
-                while (table[bucket] == DeletedItem) {
-                    table[bucket] = EmptyItem;
+                int lastBucket = (bucket - 2) & (table.length - 1);
+                while (table[lastBucket] == DeletedItem) {
+                    table[lastBucket] = EmptyItem;
+                    table[lastBucket + 1] = EmptyItem;
                     --usedBuckets;
 
-                    bucket = (bucket - 1) & (table.length - 1);
+                    lastBucket = (lastBucket - 2) & (table.length - 1);
                 }
             } else {
                 table[bucket] = DeletedItem;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 255844cf4ba..f82bf11a90e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -173,6 +173,14 @@ public class ConcurrentOpenHashMap<K, V> {
         }
     }
 
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<K, V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
     public long size() {
         long size = 0;
         for (Section<K, V> s : sections) {
@@ -441,6 +449,17 @@ public class ConcurrentOpenHashMap<K, V> {
                                 table[bucket] = EmptyKey;
                                 table[bucket + 1] = null;
                                 --usedBuckets;
+
+                                // Cleanup all the buckets that were in `DeletedKey` state,
+                                // so that we can reduce unnecessary expansions
+                                int lastBucket = (bucket - 2) & (table.length - 1);
+                                while (table[lastBucket] == DeletedKey) {
+                                    table[lastBucket] = EmptyKey;
+                                    table[lastBucket + 1] = null;
+                                    --usedBuckets;
+
+                                    lastBucket = (lastBucket - 2) & (table.length - 1);
+                                }
                             } else {
                                 table[bucket] = DeletedKey;
                                 table[bucket + 1] = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 6dc8552174e..cf5ed7ccdc8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -152,6 +152,14 @@ public class ConcurrentOpenHashSet<V> {
         }
     }
 
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
     public long size() {
         long size = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -477,6 +485,16 @@ public class ConcurrentOpenHashSet<V> {
             if (values[nextInArray] == EmptyValue) {
                 values[bucket] = (V) EmptyValue;
                 --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedValue` state,
+                // so that we can reduce unnecessary expansions
+                int lastBucket = signSafeMod(bucket - 1, capacity);
+                while (values[lastBucket] == DeletedValue) {
+                    values[lastBucket] = (V) EmptyValue;
+                    --usedBuckets;
+
+                    lastBucket = signSafeMod(lastBucket - 1, capacity);
+                }
             } else {
                 values[bucket] = (V) DeletedValue;
             }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 6cf126cf2ff..205cf91b47d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -107,6 +107,25 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+        assertNull(map.put(4, "v4"));
+
+        assertTrue(map.remove(1, "v1"));
+        assertTrue(map.remove(2, "v2"));
+        assertTrue(map.remove(3, "v3"));
+        assertTrue(map.remove(4, "v4"));
+
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
         ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index a8d3e1d0603..86030f21619 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -74,6 +74,25 @@ public class ConcurrentLongPairSetTest {
         }
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertTrue(set.add(1, 1));
+        assertTrue(set.add(2, 2));
+        assertTrue(set.add(3, 3));
+        assertTrue(set.add(4, 4));
+
+        assertTrue(set.remove(1, 1));
+        assertTrue(set.remove(2, 2));
+        assertTrue(set.remove(3, 3));
+        assertTrue(set.remove(4, 4));
+
+        assertEquals(0, set.getUsedBucketCount());
+    }
+
     @Test
     public void simpleInsertions() {
         ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 7919485d9b6..cec52ea3ded 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -109,6 +109,25 @@ public class ConcurrentOpenHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .build();
+        assertNull(map.put("1", "1"));
+        assertNull(map.put("2", "2"));
+        assertNull(map.put("3", "3"));
+        assertNull(map.put("4", "4"));
+
+        assertEquals(map.remove("1"), "1");
+        assertEquals(map.remove("2"), "2");
+        assertEquals(map.remove("3"), "3");
+        assertEquals(map.remove("4"), "4");
+
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
         ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index af62948b64a..6c82293bec2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,24 +91,44 @@ public class ConcurrentOpenHashSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testReduceUnnecessaryExpansions() {
+        ConcurrentOpenHashSet<String> set =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                        .expectedItems(2)
+                        .concurrencyLevel(1)
+                        .build();
+
+        assertTrue(set.add("1"));
+        assertTrue(set.add("2"));
+        assertTrue(set.add("3"));
+        assertTrue(set.add("4"));
+
+        assertTrue(set.remove("1"));
+        assertTrue(set.remove("2"));
+        assertTrue(set.remove("3"));
+        assertTrue(set.remove("4"));
+        assertEquals(0, set.getUsedBucketCount());
+    }
+
     @Test
     public void testClear() {
-        ConcurrentOpenHashSet<String> map =
+        ConcurrentOpenHashSet<String> set =
                 ConcurrentOpenHashSet.<String>newBuilder()
                 .expectedItems(2)
                 .concurrencyLevel(1)
                 .autoShrink(true)
                 .mapIdleFactor(0.25f)
                 .build();
-        assertTrue(map.capacity() == 4);
+        assertTrue(set.capacity() == 4);
 
-        assertTrue(map.add("k1"));
-        assertTrue(map.add("k2"));
-        assertTrue(map.add("k3"));
+        assertTrue(set.add("k1"));
+        assertTrue(set.add("k2"));
+        assertTrue(set.add("k3"));
 
-        assertTrue(map.capacity() == 8);
-        map.clear();
-        assertTrue(map.capacity() == 4);
+        assertTrue(set.capacity() == 8);
+        set.clear();
+        assertTrue(set.capacity() == 4);
     }
 
     @Test