You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/13 04:20:22 UTC
[pulsar] branch master updated: [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 44ae3487cc2 [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
44ae3487cc2 is described below
commit 44ae3487cc2aaaebdc5ce892d05eef76ca0384c6
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Oct 13 12:20:12 2022 +0800
[fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 9 ++--
.../delayed/InMemoryDeliveryTrackerTest.java | 49 +++++++++++++++++++---
2 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 11d663322be..ba8a9311817 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@Slf4j
public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {
- private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
+ protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
private final PersistentDispatcherMultipleConsumers dispatcher;
@@ -41,7 +41,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
private final Timer timer;
// Current timeout or null if not set
- private Timeout timeout;
+ protected Timeout timeout;
// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;
@@ -265,7 +265,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
- if (timeout.isCancelled()) {
+ if (timeout == null || timeout.isCancelled()) {
return;
}
@@ -279,10 +279,11 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
@Override
public void close() {
- priorityQueue.close();
if (timeout != null) {
timeout.cancel();
+ timeout = null;
}
+ priorityQueue.close();
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 1ff47a4ca50..11b681d80a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -28,13 +28,13 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.Collections;
import java.util.NavigableMap;
@@ -42,10 +42,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Cleanup;
-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.awaitility.Awaitility;
@@ -433,4 +430,46 @@ public class InMemoryDeliveryTrackerTest {
assertFalse(tracker.shouldPauseAllDeliveries());
}
+ @Test
+ public void testClose() throws Exception {
+ Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+ 1, TimeUnit.MILLISECONDS);
+
+ PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);
+
+ AtomicLong clockTime = new AtomicLong();
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(x -> clockTime.get());
+
+ final Exception[] exceptions = new Exception[1];
+
+ InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+ true, 0) {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS);
+ if (timeout == null || timeout.isCancelled()) {
+ return;
+ }
+ try {
+ this.priorityQueue.peekN1();
+ } catch (Exception e) {
+ e.printStackTrace();
+ exceptions[0] = e;
+ }
+ }
+ };
+
+ tracker.addMessage(1, 1, 10);
+ clockTime.set(10);
+
+ Thread.sleep(300);
+
+ tracker.close();
+
+ assertNull(exceptions[0]);
+
+ timer.stop();
+ }
+
}