You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/03/08 09:23:28 UTC

[rocketmq] branch develop updated: [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9e0fb1b02 [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
9e0fb1b02 is described below

commit 9e0fb1b0293aebbe5019e55c0fdbd356611b3d16
Author: rongtong <ji...@163.com>
AuthorDate: Wed Mar 8 17:23:06 2023 +0800

    [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
    
    * Fix some NPE when timerWheel disabled
    
    * Add UT
---
 .../broker/processor/PopMessageProcessor.java      |  13 ++-
 .../broker/processor/PopReviveService.java         |   5 +
 .../broker/processor/PopMessageProcessorTest.java  |  16 +++-
 .../store/metrics/DefaultStoreMetricsManager.java  | 104 +++++++++++----------
 4 files changed, 83 insertions(+), 55 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index cd4595326..d63fbe621 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -278,19 +278,26 @@ public class PopMessageProcessor implements NettyRequestProcessor {
 
         if (requestHeader.isTimeoutTooMuch()) {
             response.setCode(ResponseCode.POLLING_TIMEOUT);
-            response.setRemark(String.format("the broker[%s] poping message is timeout too much",
+            response.setRemark(String.format("the broker[%s] pop message is timeout too much",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
         if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark(String.format("the broker[%s] poping message is forbidden",
+            response.setRemark(String.format("the broker[%s] pop message is forbidden",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
         if (requestHeader.getMaxMsgNums() > 32) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("the broker[%s] poping message's num is greater than 32",
+            response.setRemark(String.format("the broker[%s] pop message's num is greater than 32",
+                this.brokerController.getBrokerConfig().getBrokerIP1()));
+            return response;
+        }
+
+        if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index a63cd2930..116cb0f82 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -600,6 +600,11 @@ public class PopReviveService extends ServiceThread {
                     continue;
                 }
 
+                if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
+                    POP_LOGGER.warn("skip revive topic because timerWheelEnable is false");
+                    continue;
+                }
+
                 POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId);
                 ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
                 consumeReviveMessage(consumeReviveObj);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index aabc68220..44f04066c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -91,6 +91,7 @@ public class PopMessageProcessorTest {
 
     @Test
     public void testProcessRequest_TopicNotExist() throws RemotingCommandException {
+        when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
         brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
         final RemotingCommand request = createPopMsgCommand();
         RemotingCommand response = popMessageProcessor.processRequest(handlerContext, request);
@@ -102,6 +103,7 @@ public class PopMessageProcessorTest {
     @Test
     public void testProcessRequest_Found() throws RemotingCommandException, InterruptedException {
         GetMessageResult getMessageResult = createGetMessageResult(1);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -115,6 +117,7 @@ public class PopMessageProcessorTest {
     public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult(1);
         getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -128,6 +131,7 @@ public class PopMessageProcessorTest {
     public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult(0);
         getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -135,6 +139,17 @@ public class PopMessageProcessorTest {
         assertThat(response).isNull();
     }
 
+    @Test
+    public void testProcessRequest_whenTimerWheelIsFalse() throws RemotingCommandException {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setTimerWheelEnable(false);
+        when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        final RemotingCommand request = createPopMsgCommand();
+        RemotingCommand response = popMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        assertThat(response.getRemark()).contains("pop message is forbidden because timerWheelEnable is false");
+    }
 
     private RemotingCommand createPopMsgCommand() {
         PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
@@ -152,7 +167,6 @@ public class PopMessageProcessorTest {
         return request;
     }
 
-
     private GetMessageResult createGetMessageResult(int msgCnt) {
         GetMessageResult getMessageResult = new GetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 686265292..9132761a6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -116,57 +116,59 @@ public class DefaultStoreMetricsManager {
                 measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build());
             });
 
-        timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
-            .setDescription("Timer enqueue messages lag")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
-            });
-
-        timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
-            .setDescription("Timer enqueue latency")
-            .setUnit("milliseconds")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
-            });
-        timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
-            .setDescription("Timer dequeue messages lag")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
-            });
-        timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
-            .setDescription("Timer dequeue latency")
-            .setUnit("milliseconds")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
-            });
-        timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
-            .setDescription("Current message number in timing")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
-                timerMessageStore.getTimerMetrics()
-                    .getTimingCount()
-                    .forEach((topic, metric) -> {
-                        measurement.record(
-                            metric.getCount().get(),
-                            newAttributesBuilder().put(LABEL_TOPIC, topic).build()
-                        );
-                    });
-            });
-        timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
-            .setDescription("Total number of timer dequeue")
-            .build();
-        timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
-            .setDescription("Total number of timer enqueue")
-            .build();
+        if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
+            timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+                .setDescription("Timer enqueue messages lag")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                    measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
+                });
+
+            timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+                .setDescription("Timer enqueue latency")
+                .setUnit("milliseconds")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                    measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
+                });
+            timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+                .setDescription("Timer dequeue messages lag")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                    measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
+                });
+            timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+                .setDescription("Timer dequeue latency")
+                .setUnit("milliseconds")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                    measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
+                });
+            timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+                .setDescription("Current message number in timing")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                    timerMessageStore.getTimerMetrics()
+                        .getTimingCount()
+                        .forEach((topic, metric) -> {
+                            measurement.record(
+                                metric.getCount().get(),
+                                newAttributesBuilder().put(LABEL_TOPIC, topic).build()
+                            );
+                        });
+                });
+            timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+                .setDescription("Total number of timer dequeue")
+                .build();
+            timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+                .setDescription("Total number of timer enqueue")
+                .build();
+        }
     }
 
     public static void incTimerDequeueCount(String topic) {