You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/16 08:50:08 UTC
(pulsar) branch branch-3.0 updated: [improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c5348a46900 [improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)
c5348a46900 is described below
commit c5348a4690034544005d239f22f8996c32a8dcca
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Dec 27 00:13:29 2023 +0800
[improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)
---
.../pulsar/client/impl/MemoryLimitController.java | 27 ++++++++++++++++
.../client/impl/MemoryLimitControllerTest.java | 36 ++++++++++++++++++++++
2 files changed, 63 insertions(+)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index 935e3fad2b5..c15821c0543 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -22,7 +22,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class MemoryLimitController {
private final long memoryLimit;
@@ -46,11 +48,19 @@ public class MemoryLimitController {
}
public void forceReserveMemory(long size) {
+ checkPositive(size);
+ if (size == 0) {
+ return;
+ }
long newUsage = currentUsage.addAndGet(size);
checkTrigger(newUsage - size, newUsage);
}
public boolean tryReserveMemory(long size) {
+ checkPositive(size);
+ if (size == 0) {
+ return true;
+ }
while (true) {
long current = currentUsage.get();
long newUsage = current + size;
@@ -68,6 +78,15 @@ public class MemoryLimitController {
}
}
+ private static void checkPositive(long memorySize) {
+ if (memorySize < 0) {
+ String errorMsg = String.format("Try to reserve/release memory failed, the param memorySize"
+ + " is a negative value: %s", memorySize);
+ log.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg);
+ }
+ }
+
private void checkTrigger(long prevUsage, long newUsage) {
if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
if (triggerRunning.compareAndSet(false, true)) {
@@ -81,6 +100,10 @@ public class MemoryLimitController {
}
public void reserveMemory(long size) throws InterruptedException {
+ checkPositive(size);
+ if (size == 0) {
+ return;
+ }
if (!tryReserveMemory(size)) {
mutex.lock();
try {
@@ -94,6 +117,10 @@ public class MemoryLimitController {
}
public void releaseMemory(long size) {
+ checkPositive(size);
+ if (size == 0) {
+ return;
+ }
long newUsage = currentUsage.addAndGet(-size);
if (newUsage + size > memoryLimit
&& newUsage <= memoryLimit) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index 78ffa247f7b..1aaf3f77da4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -197,4 +198,39 @@ public class MemoryLimitControllerTest {
assertTrue(l3.await(1, TimeUnit.SECONDS));
assertEquals(mlc.currentUsage(), 101);
}
+
+ @Test
+ public void testModifyMemoryFailedDueToNegativeParam() throws Exception {
+ MemoryLimitController mlc = new MemoryLimitController(100);
+
+ try {
+ mlc.tryReserveMemory(-1);
+ fail("The test should fail due to calling tryReserveMemory with a negative value.");
+ } catch (IllegalArgumentException e) {
+ // Expected ex.
+ }
+
+ try {
+ mlc.reserveMemory(-1);
+ fail("The test should fail due to calling reserveMemory with a negative value.");
+ } catch (IllegalArgumentException e) {
+ // Expected ex.
+ }
+
+ try {
+ mlc.forceReserveMemory(-1);
+ fail("The test should fail due to calling forceReserveMemory with a negative value.");
+ } catch (IllegalArgumentException e) {
+ // Expected ex.
+ }
+
+ try {
+ mlc.releaseMemory(-1);
+ fail("The test should fail due to calling releaseMemory with a negative value.");
+ } catch (IllegalArgumentException e) {
+ // Expected ex.
+ }
+
+ assertEquals(mlc.currentUsage(), 0);
+ }
}