You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:31 UTC

[pulsar] 08/12: [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 230e1ac9f97e292a01766766705d531f3eb8bf34
Author: JiangHaiting <ja...@qq.com>
AuthorDate: Mon Nov 15 08:52:31 2021 +0800

    [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729)
    
    See #12723
    
    Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value   in a thread safe way.
    
    (cherry picked from commit a3fe00efc4ccba55a0f28fd02b535c6624e3ed0a)
---
 .../broker/service/persistent/PersistentTopic.java |  6 ++--
 .../util/collections/ConcurrentOpenHashMap.java    | 26 ++++++++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 32 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cdc0e00..3e7d733 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1544,7 +1544,7 @@ public class PersistentTopic extends AbstractTopic
 
     protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
-        replicators.computeIfAbsent(remoteCluster, r -> {
+        Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
             try {
                 return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
                         brokerService);
@@ -1555,8 +1555,8 @@ public class PersistentTopic extends AbstractTopic
             return null;
         });
         // clean up replicator if startup is failed
-        if (!isReplicatorStarted.get()) {
-            replicators.remove(remoteCluster);
+        if (replicator == null) {
+            replicators.removeNullValue(remoteCluster);
         }
         return isReplicatorStarted.get();
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 47927a9..2c7eed1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
     private static final Object EmptyKey = null;
     private static final Object DeletedKey = new Object();
 
+    /**
+     * This object is used to delete empty value in this map.
+     * EmptyValue.equals(null) = true.
+     */
+    private static final Object EmptyValue = new Object() {
+
+        @SuppressFBWarnings
+        @Override
+        public boolean equals(Object obj) {
+            return obj == null;
+        }
+
+        /**
+         * This is just for avoiding spotbugs errors
+         */
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    };
+
     private static final float MapFillFactor = 0.66f;
 
     private static final int DefaultExpectedItems = 256;
@@ -142,6 +164,10 @@ public class ConcurrentOpenHashMap<K, V> {
         return getSection(h).remove(key, value, (int) h) != null;
     }
 
+    public void removeNullValue(K key) {
+        remove(key, EmptyValue);
+    }
+
     private Section<K, V> getSection(long hash) {
         // Use 32 msb out of long to get the section
         final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index e18012c..254be51 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -369,6 +370,37 @@ public class ConcurrentOpenHashMapTest {
         assertNull(map.get(t1_b));
     }
 
+    @Test
+    public void testNullValue() {
+        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        String key = "a";
+        assertThrows(NullPointerException.class, () -> map.put(key, null));
+
+        //put a null value.
+        assertNull(map.computeIfAbsent(key, k -> null));
+        assertEquals(1, map.size());
+        assertEquals(1, map.keys().size());
+        assertEquals(1, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+        //test remove null value
+        map.removeNullValue(key);
+        assertTrue(map.isEmpty());
+        assertEquals(0, map.keys().size());
+        assertEquals(0, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+
+        //test not remove non-null value
+        map.put(key, "V");
+        assertEquals(1, map.size());
+        map.removeNullValue(key);
+        assertEquals(1, map.size());
+
+    }
+
     static final int Iterations = 1;
     static final int ReadIterations = 1000;
     static final int N = 1_000_000;