You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:36 UTC

[pulsar] 06/17: [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2e5c984b201f99759c71fff2e2757860c69d4aae
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue May 24 12:15:25 2022 +0800

    [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
    
    ### Motivation
    
    When `ReferenceCounted` object overrides the method `deallocate` to make the `getLength` value equal null will cause NPE because the `RangeCache#put` method is not thread-safe. (The process of describing this abstraction is not very clear, please refer to the code modification :)
    
    Pulsar implementation may throw an exception to make `OpAddEntry` fail abnormal and fence the topic. relative code as below:
    
    https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L211-L217
    
    **Exception screenshot:**
    
    ```java
    java.lang.NullPointerException: Cannot invoke "String.length()" because "value.s" is null
    
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.lambda$testInParallel$6(RangeCacheTest.java:279)
            at org.apache.bookkeeper.mledger.util.RangeCache.put(RangeCache.java:77)
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.testInParallel(RangeCacheTest.java:283)
            at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
            at java.base/java.lang.reflect.Method.invoke(Method.java:577)
            at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
            at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
            at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
            at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
            at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
            at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
            at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
            at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
            at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
            at org.testng.TestRunner.privateRun(TestRunner.java:764)
            at org.testng.TestRunner.run(TestRunner.java:585)
            at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
            at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
            at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
            at org.testng.SuiteRunner.run(SuiteRunner.java:286)
            at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
            at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
            at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
            at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
            at org.testng.TestNG.runSuites(TestNG.java:1069)
            at org.testng.TestNG.run(TestNG.java:1037)
            at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
            at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
    ```
    
    ### Modifications
    
    - Make the `RangeCache#put` method to thread-safe.
    
    (cherry picked from commit b155d84c2ee397fe8003f452f04ae6cedf229b5c)
---
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 +++++----
 .../bookkeeper/mledger/util/RangeCacheTest.java    | 29 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5b31335f35..1de9429d7c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
@@ -73,12 +74,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      * @return whether the entry was inserted in the cache
      */
     public boolean put(Key key, Value value) {
-        if (entries.putIfAbsent(key, value) == null) {
+        MutableBoolean flag = new MutableBoolean();
+        entries.computeIfAbsent(key, (k) -> {
             size.addAndGet(weighter.getSize(value));
-            return true;
-        } else {
-            return false;
-        }
+            flag.setValue(true);
+            return value;
+        });
+        return flag.booleanValue();
     }
 
     public Value get(Key key) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 95896d24f35..f31aa4a74f9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -29,11 +29,15 @@ import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.ReferenceCounted;
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.annotations.Test;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class RangeCacheTest {
 
     class RefString extends AbstractReferenceCounted implements ReferenceCounted {
-        final String s;
+        String s;
 
         RefString(String s) {
             super();
@@ -43,7 +47,7 @@ public class RangeCacheTest {
 
         @Override
         protected void deallocate() {
-            // no-op
+            s = null;
         }
 
         @Override
@@ -122,6 +126,7 @@ public class RangeCacheTest {
         assertEquals(cache.getNumberOfEntries(), 2);
     }
 
+
     @Test
     public void customTimeExtraction() {
         RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
@@ -268,4 +273,24 @@ public class RangeCacheTest {
         assertEquals((long) res.getRight(), 10);
         assertEquals(cache.getSize(), 90);
     }
+
+    @Test
+    public void testInParallel() {
+        RangeCache<String, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < 1000; i++) {
+            cache.put(UUID.randomUUID().toString(), new RefString("zero"));
+        }
+        executor.shutdown();
+    }
+
+    @Test
+    public void testPutSameObj() {
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        RefString s0 = new RefString("zero");
+        assertEquals(s0.refCnt(), 1);
+        assertTrue(cache.put(0, s0));
+        assertFalse(cache.put(0, s0));
+    }
 }