You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/13 04:20:22 UTC

[pulsar] branch master updated: [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ae3487cc2 [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
44ae3487cc2 is described below

commit 44ae3487cc2aaaebdc5ce892d05eef76ca0384c6
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Oct 13 12:20:12 2022 +0800

    [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  9 ++--
 .../delayed/InMemoryDeliveryTrackerTest.java       | 49 +++++++++++++++++++---
 2 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 11d663322be..ba8a9311817 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 @Slf4j
 public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {
 
-    private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
+    protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
 
     private final PersistentDispatcherMultipleConsumers dispatcher;
 
@@ -41,7 +41,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     private final Timer timer;
 
     // Current timeout or null if not set
-    private Timeout timeout;
+    protected Timeout timeout;
 
     // Timestamp at which the timeout is currently set
     private long currentTimeoutTarget;
@@ -265,7 +265,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
         if (log.isDebugEnabled()) {
             log.debug("[{}] Timer triggered", dispatcher.getName());
         }
-        if (timeout.isCancelled()) {
+        if (timeout == null || timeout.isCancelled()) {
             return;
         }
 
@@ -279,10 +279,11 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
 
     @Override
     public void close() {
-        priorityQueue.close();
         if (timeout != null) {
             timeout.cancel();
+            timeout = null;
         }
+        priorityQueue.close();
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 1ff47a4ca50..11b681d80a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -28,13 +28,13 @@ import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.NavigableMap;
@@ -42,10 +42,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.awaitility.Awaitility;
@@ -433,4 +430,46 @@ public class InMemoryDeliveryTrackerTest {
         assertFalse(tracker.shouldPauseAllDeliveries());
     }
 
+    @Test
+    public void testClose() throws Exception {
+        Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+                1, TimeUnit.MILLISECONDS);
+
+        PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        final Exception[] exceptions = new Exception[1];
+
+        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true, 0) {
+            @Override
+            public void run(Timeout timeout) throws Exception {
+                super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS);
+                if (timeout == null || timeout.isCancelled()) {
+                    return;
+                }
+                try {
+                    this.priorityQueue.peekN1();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions[0] = e;
+                }
+            }
+        };
+
+        tracker.addMessage(1, 1, 10);
+        clockTime.set(10);
+
+        Thread.sleep(300);
+
+        tracker.close();
+
+        assertNull(exceptions[0]);
+
+        timer.stop();
+    }
+
 }