You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/16 08:50:08 UTC

(pulsar) branch branch-3.0 updated: [improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c5348a46900 [improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)
c5348a46900 is described below

commit c5348a4690034544005d239f22f8996c32a8dcca
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Dec 27 00:13:29 2023 +0800

    [improve] [client] Prevent reserve memory with a negative memory size to avoid send task stuck (#21804)
---
 .../pulsar/client/impl/MemoryLimitController.java  | 27 ++++++++++++++++
 .../client/impl/MemoryLimitControllerTest.java     | 36 ++++++++++++++++++++++
 2 files changed, 63 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index 935e3fad2b5..c15821c0543 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -22,7 +22,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class MemoryLimitController {
 
     private final long memoryLimit;
@@ -46,11 +48,19 @@ public class MemoryLimitController {
     }
 
     public void forceReserveMemory(long size) {
+        checkPositive(size);
+        if (size == 0) {
+            return;
+        }
         long newUsage = currentUsage.addAndGet(size);
         checkTrigger(newUsage - size, newUsage);
     }
 
     public boolean tryReserveMemory(long size) {
+        checkPositive(size);
+        if (size == 0) {
+            return true;
+        }
         while (true) {
             long current = currentUsage.get();
             long newUsage = current + size;
@@ -68,6 +78,15 @@ public class MemoryLimitController {
         }
     }
 
+    private static void checkPositive(long memorySize) {
+        if (memorySize < 0) {
+            String errorMsg = String.format("Try to reserve/release memory failed, the param memorySize"
+                    + " is a negative value: %s", memorySize);
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+    }
+
     private void checkTrigger(long prevUsage, long newUsage) {
         if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
             if (triggerRunning.compareAndSet(false, true)) {
@@ -81,6 +100,10 @@ public class MemoryLimitController {
     }
 
     public void reserveMemory(long size) throws InterruptedException {
+        checkPositive(size);
+        if (size == 0) {
+            return;
+        }
         if (!tryReserveMemory(size)) {
             mutex.lock();
             try {
@@ -94,6 +117,10 @@ public class MemoryLimitController {
     }
 
     public void releaseMemory(long size) {
+        checkPositive(size);
+        if (size == 0) {
+            return;
+        }
         long newUsage = currentUsage.addAndGet(-size);
         if (newUsage + size > memoryLimit
                 && newUsage <= memoryLimit) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index 78ffa247f7b..1aaf3f77da4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -197,4 +198,39 @@ public class MemoryLimitControllerTest {
         assertTrue(l3.await(1, TimeUnit.SECONDS));
         assertEquals(mlc.currentUsage(), 101);
     }
+
+    @Test
+    public void testModifyMemoryFailedDueToNegativeParam() throws Exception {
+        MemoryLimitController mlc = new MemoryLimitController(100);
+
+        try {
+            mlc.tryReserveMemory(-1);
+            fail("The test should fail due to calling tryReserveMemory with a negative value.");
+        } catch (IllegalArgumentException e) {
+            // Expected ex.
+        }
+
+        try {
+            mlc.reserveMemory(-1);
+            fail("The test should fail due to calling reserveMemory with a negative value.");
+        } catch (IllegalArgumentException e) {
+            // Expected ex.
+        }
+
+        try {
+            mlc.forceReserveMemory(-1);
+            fail("The test should fail due to calling forceReserveMemory with a negative value.");
+        } catch (IllegalArgumentException e) {
+            // Expected ex.
+        }
+
+        try {
+            mlc.releaseMemory(-1);
+            fail("The test should fail due to calling releaseMemory with a negative value.");
+        } catch (IllegalArgumentException e) {
+            // Expected ex.
+        }
+
+        assertEquals(mlc.currentUsage(), 0);
+    }
 }