You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:36 UTC
[pulsar] 06/17: [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2e5c984b201f99759c71fff2e2757860c69d4aae
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue May 24 12:15:25 2022 +0800
[fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
### Motivation
When `ReferenceCounted` object overrides the method `deallocate` to make the `getLength` value equal null will cause NPE because the `RangeCache#put` method is not thread-safe. (The process of describing this abstraction is not very clear, please refer to the code modification :)
Pulsar implementation may throw an exception to make `OpAddEntry` fail abnormal and fence the topic. relative code as below:
https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L211-L217
**Exception screenshot:**
```java
java.lang.NullPointerException: Cannot invoke "String.length()" because "value.s" is null
at org.apache.bookkeeper.mledger.util.RangeCacheTest.lambda$testInParallel$6(RangeCacheTest.java:279)
at org.apache.bookkeeper.mledger.util.RangeCache.put(RangeCache.java:77)
at org.apache.bookkeeper.mledger.util.RangeCacheTest.testInParallel(RangeCacheTest.java:283)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:577)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:764)
at org.testng.TestRunner.run(TestRunner.java:585)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
at org.testng.SuiteRunner.run(SuiteRunner.java:286)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
at org.testng.TestNG.runSuites(TestNG.java:1069)
at org.testng.TestNG.run(TestNG.java:1037)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
```
### Modifications
- Make the `RangeCache#put` method to thread-safe.
(cherry picked from commit b155d84c2ee397fe8003f452f04ae6cedf229b5c)
---
.../apache/bookkeeper/mledger/util/RangeCache.java | 12 +++++----
.../bookkeeper/mledger/util/RangeCacheTest.java | 29 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5b31335f35..1de9429d7c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
/**
@@ -73,12 +74,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
* @return whether the entry was inserted in the cache
*/
public boolean put(Key key, Value value) {
- if (entries.putIfAbsent(key, value) == null) {
+ MutableBoolean flag = new MutableBoolean();
+ entries.computeIfAbsent(key, (k) -> {
size.addAndGet(weighter.getSize(value));
- return true;
- } else {
- return false;
- }
+ flag.setValue(true);
+ return value;
+ });
+ return flag.booleanValue();
}
public Value get(Key key) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 95896d24f35..f31aa4a74f9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -29,11 +29,15 @@ import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.annotations.Test;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
public class RangeCacheTest {
class RefString extends AbstractReferenceCounted implements ReferenceCounted {
- final String s;
+ String s;
RefString(String s) {
super();
@@ -43,7 +47,7 @@ public class RangeCacheTest {
@Override
protected void deallocate() {
- // no-op
+ s = null;
}
@Override
@@ -122,6 +126,7 @@ public class RangeCacheTest {
assertEquals(cache.getNumberOfEntries(), 2);
}
+
@Test
public void customTimeExtraction() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
@@ -268,4 +273,24 @@ public class RangeCacheTest {
assertEquals((long) res.getRight(), 10);
assertEquals(cache.getSize(), 90);
}
+
+ @Test
+ public void testInParallel() {
+ RangeCache<String, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < 1000; i++) {
+ cache.put(UUID.randomUUID().toString(), new RefString("zero"));
+ }
+ executor.shutdown();
+ }
+
+ @Test
+ public void testPutSameObj() {
+ RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+ RefString s0 = new RefString("zero");
+ assertEquals(s0.refCnt(), 1);
+ assertTrue(cache.put(0, s0));
+ assertFalse(cache.put(0, s0));
+ }
}