You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:31 UTC
[pulsar] 08/12: [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 230e1ac9f97e292a01766766705d531f3eb8bf34
Author: JiangHaiting <ja...@qq.com>
AuthorDate: Mon Nov 15 08:52:31 2021 +0800
[Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)
See #12723
Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value in a thread safe way.
(cherry picked from commit a3fe00efc4ccba55a0f28fd02b535c6624e3ed0a)
---
.../broker/service/persistent/PersistentTopic.java | 6 ++--
.../util/collections/ConcurrentOpenHashMap.java | 26 ++++++++++++++++++
.../collections/ConcurrentOpenHashMapTest.java | 32 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cdc0e00..3e7d733 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1544,7 +1544,7 @@ public class PersistentTopic extends AbstractTopic
protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
- replicators.computeIfAbsent(remoteCluster, r -> {
+ Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
@@ -1555,8 +1555,8 @@ public class PersistentTopic extends AbstractTopic
return null;
});
// clean up replicator if startup is failed
- if (!isReplicatorStarted.get()) {
- replicators.remove(remoteCluster);
+ if (replicator == null) {
+ replicators.removeNullValue(remoteCluster);
}
return isReplicatorStarted.get();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 47927a9..2c7eed1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
private static final Object EmptyKey = null;
private static final Object DeletedKey = new Object();
+ /**
+ * This object is used to delete empty value in this map.
+ * EmptyValue.equals(null) = true.
+ */
+ private static final Object EmptyValue = new Object() {
+
+ @SuppressFBWarnings
+ @Override
+ public boolean equals(Object obj) {
+ return obj == null;
+ }
+
+ /**
+ * This is just for avoiding spotbugs errors
+ */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ };
+
private static final float MapFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
@@ -142,6 +164,10 @@ public class ConcurrentOpenHashMap<K, V> {
return getSection(h).remove(key, value, (int) h) != null;
}
+ public void removeNullValue(K key) {
+ remove(key, EmptyValue);
+ }
+
private Section<K, V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index e18012c..254be51 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -369,6 +370,37 @@ public class ConcurrentOpenHashMapTest {
assertNull(map.get(t1_b));
}
+ @Test
+ public void testNullValue() {
+ ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+ String key = "a";
+ assertThrows(NullPointerException.class, () -> map.put(key, null));
+
+ //put a null value.
+ assertNull(map.computeIfAbsent(key, k -> null));
+ assertEquals(1, map.size());
+ assertEquals(1, map.keys().size());
+ assertEquals(1, map.values().size());
+ assertNull(map.get(key));
+ assertFalse(map.containsKey(key));
+
+ //test remove null value
+ map.removeNullValue(key);
+ assertTrue(map.isEmpty());
+ assertEquals(0, map.keys().size());
+ assertEquals(0, map.values().size());
+ assertNull(map.get(key));
+ assertFalse(map.containsKey(key));
+
+
+ //test not remove non-null value
+ map.put(key, "V");
+ assertEquals(1, map.size());
+ map.removeNullValue(key);
+ assertEquals(1, map.size());
+
+ }
+
static final int Iterations = 1;
static final int ReadIterations = 1000;
static final int N = 1_000_000;