You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:44 UTC
[pulsar] 04/17: Reduce unnecessary expansions for ConcurrentLong map and set (#14562)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e38d75a0950de0dd5ab5dcc620a20cb332ee5fb8
Author: LinChen <15...@qq.com>
AuthorDate: Tue Mar 15 11:41:08 2022 +0800
Reduce unnecessary expansions for ConcurrentLong map and set (#14562)
(cherry picked from commit 8e7006f899bd2b9ed9482ab2ce1ee35233957d03)
---
.../util/collections/ConcurrentLongHashMap.java | 10 ++++++
.../util/collections/ConcurrentLongPairSet.java | 11 ++++---
.../util/collections/ConcurrentOpenHashMap.java | 19 ++++++++++++
.../util/collections/ConcurrentOpenHashSet.java | 18 +++++++++++
.../collections/ConcurrentLongHashMapTest.java | 19 ++++++++++++
.../collections/ConcurrentLongPairSetTest.java | 19 ++++++++++++
.../collections/ConcurrentOpenHashMapTest.java | 19 ++++++++++++
.../collections/ConcurrentOpenHashSetTest.java | 36 +++++++++++++++++-----
8 files changed, 138 insertions(+), 13 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index a4779357a44..6f2794468c4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -451,6 +451,16 @@ public class ConcurrentLongHashMap<V> {
if (nextValueInArray == EmptyValue) {
values[bucket] = (V) EmptyValue;
--usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedValue` state,
+ // so that we can reduce unnecessary expansions
+ int lastBucket = signSafeMod(bucket - 1, capacity);
+ while (values[lastBucket] == DeletedValue) {
+ values[lastBucket] = (V) EmptyValue;
+ --usedBuckets;
+
+ lastBucket = signSafeMod(lastBucket - 1, capacity);
+ }
} else {
values[bucket] = (V) DeletedValue;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index abbe11576a9..66ecaee4bfa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -493,14 +493,15 @@ public class ConcurrentLongPairSet implements LongPairSet {
table[bucket + 1] = EmptyItem;
--usedBuckets;
- // Cleanup all the buckets that were in `DeletedKey` state,
+ // Cleanup all the buckets that were in `DeletedItem` state,
// so that we can reduce unnecessary expansions
- bucket = (bucket - 1) & (table.length - 1);
- while (table[bucket] == DeletedItem) {
- table[bucket] = EmptyItem;
+ int lastBucket = (bucket - 2) & (table.length - 1);
+ while (table[lastBucket] == DeletedItem) {
+ table[lastBucket] = EmptyItem;
+ table[lastBucket + 1] = EmptyItem;
--usedBuckets;
- bucket = (bucket - 1) & (table.length - 1);
+ lastBucket = (lastBucket - 2) & (table.length - 1);
}
} else {
table[bucket] = DeletedItem;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 255844cf4ba..f82bf11a90e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -173,6 +173,14 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
+ long getUsedBucketCount() {
+ long usedBucketCount = 0;
+ for (Section<K, V> s : sections) {
+ usedBucketCount += s.usedBuckets;
+ }
+ return usedBucketCount;
+ }
+
public long size() {
long size = 0;
for (Section<K, V> s : sections) {
@@ -441,6 +449,17 @@ public class ConcurrentOpenHashMap<K, V> {
table[bucket] = EmptyKey;
table[bucket + 1] = null;
--usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedKey` state,
+ // so that we can reduce unnecessary expansions
+ int lastBucket = (bucket - 2) & (table.length - 1);
+ while (table[lastBucket] == DeletedKey) {
+ table[lastBucket] = EmptyKey;
+ table[lastBucket + 1] = null;
+ --usedBuckets;
+
+ lastBucket = (lastBucket - 2) & (table.length - 1);
+ }
} else {
table[bucket] = DeletedKey;
table[bucket + 1] = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 6dc8552174e..cf5ed7ccdc8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -152,6 +152,14 @@ public class ConcurrentOpenHashSet<V> {
}
}
+ long getUsedBucketCount() {
+ long usedBucketCount = 0;
+ for (Section<V> s : sections) {
+ usedBucketCount += s.usedBuckets;
+ }
+ return usedBucketCount;
+ }
+
public long size() {
long size = 0;
for (int i = 0; i < sections.length; i++) {
@@ -477,6 +485,16 @@ public class ConcurrentOpenHashSet<V> {
if (values[nextInArray] == EmptyValue) {
values[bucket] = (V) EmptyValue;
--usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedValue` state,
+ // so that we can reduce unnecessary expansions
+ int lastBucket = signSafeMod(bucket - 1, capacity);
+ while (values[lastBucket] == DeletedValue) {
+ values[lastBucket] = (V) EmptyValue;
+ --usedBuckets;
+
+ lastBucket = signSafeMod(lastBucket - 1, capacity);
+ }
} else {
values[bucket] = (V) DeletedValue;
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 6cf126cf2ff..205cf91b47d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -107,6 +107,25 @@ public class ConcurrentLongHashMapTest {
assertEquals(map.size(), 3);
}
+ @Test
+ public void testReduceUnnecessaryExpansions() {
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .build();
+ assertNull(map.put(1, "v1"));
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+ assertNull(map.put(4, "v4"));
+
+ assertTrue(map.remove(1, "v1"));
+ assertTrue(map.remove(2, "v2"));
+ assertTrue(map.remove(3, "v3"));
+ assertTrue(map.remove(4, "v4"));
+
+ assertEquals(0, map.getUsedBucketCount());
+ }
+
@Test
public void testClear() {
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index a8d3e1d0603..86030f21619 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -74,6 +74,25 @@ public class ConcurrentLongPairSetTest {
}
}
+ @Test
+ public void testReduceUnnecessaryExpansions() {
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .build();
+ assertTrue(set.add(1, 1));
+ assertTrue(set.add(2, 2));
+ assertTrue(set.add(3, 3));
+ assertTrue(set.add(4, 4));
+
+ assertTrue(set.remove(1, 1));
+ assertTrue(set.remove(2, 2));
+ assertTrue(set.remove(3, 3));
+ assertTrue(set.remove(4, 4));
+
+ assertEquals(0, set.getUsedBucketCount());
+ }
+
@Test
public void simpleInsertions() {
ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 7919485d9b6..cec52ea3ded 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -109,6 +109,25 @@ public class ConcurrentOpenHashMapTest {
assertEquals(map.size(), 3);
}
+ @Test
+ public void testReduceUnnecessaryExpansions() {
+ ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .build();
+ assertNull(map.put("1", "1"));
+ assertNull(map.put("2", "2"));
+ assertNull(map.put("3", "3"));
+ assertNull(map.put("4", "4"));
+
+ assertEquals(map.remove("1"), "1");
+ assertEquals(map.remove("2"), "2");
+ assertEquals(map.remove("3"), "3");
+ assertEquals(map.remove("4"), "4");
+
+ assertEquals(0, map.getUsedBucketCount());
+ }
+
@Test
public void testClear() {
ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index af62948b64a..6c82293bec2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,24 +91,44 @@ public class ConcurrentOpenHashSetTest {
assertEquals(set.size(), 3);
}
+ @Test
+ public void testReduceUnnecessaryExpansions() {
+ ConcurrentOpenHashSet<String> set =
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .build();
+
+ assertTrue(set.add("1"));
+ assertTrue(set.add("2"));
+ assertTrue(set.add("3"));
+ assertTrue(set.add("4"));
+
+ assertTrue(set.remove("1"));
+ assertTrue(set.remove("2"));
+ assertTrue(set.remove("3"));
+ assertTrue(set.remove("4"));
+ assertEquals(0, set.getUsedBucketCount());
+ }
+
@Test
public void testClear() {
- ConcurrentOpenHashSet<String> map =
+ ConcurrentOpenHashSet<String> set =
ConcurrentOpenHashSet.<String>newBuilder()
.expectedItems(2)
.concurrencyLevel(1)
.autoShrink(true)
.mapIdleFactor(0.25f)
.build();
- assertTrue(map.capacity() == 4);
+ assertTrue(set.capacity() == 4);
- assertTrue(map.add("k1"));
- assertTrue(map.add("k2"));
- assertTrue(map.add("k3"));
+ assertTrue(set.add("k1"));
+ assertTrue(set.add("k2"));
+ assertTrue(set.add("k3"));
- assertTrue(map.capacity() == 8);
- map.clear();
- assertTrue(map.capacity() == 4);
+ assertTrue(set.capacity() == 8);
+ set.clear();
+ assertTrue(set.capacity() == 4);
}
@Test