You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/03/08 09:23:28 UTC
[rocketmq] branch develop updated: [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9e0fb1b02 [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
9e0fb1b02 is described below
commit 9e0fb1b0293aebbe5019e55c0fdbd356611b3d16
Author: rongtong <ji...@163.com>
AuthorDate: Wed Mar 8 17:23:06 2023 +0800
[ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
* Fix some NPE when timerWheel disabled
* Add UT
---
.../broker/processor/PopMessageProcessor.java | 13 ++-
.../broker/processor/PopReviveService.java | 5 +
.../broker/processor/PopMessageProcessorTest.java | 16 +++-
.../store/metrics/DefaultStoreMetricsManager.java | 104 +++++++++++----------
4 files changed, 83 insertions(+), 55 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index cd4595326..d63fbe621 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -278,19 +278,26 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (requestHeader.isTimeoutTooMuch()) {
response.setCode(ResponseCode.POLLING_TIMEOUT);
- response.setRemark(String.format("the broker[%s] poping message is timeout too much",
+ response.setRemark(String.format("the broker[%s] pop message is timeout too much",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(String.format("the broker[%s] poping message is forbidden",
+ response.setRemark(String.format("the broker[%s] pop message is forbidden",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
if (requestHeader.getMaxMsgNums() > 32) {
response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("the broker[%s] poping message's num is greater than 32",
+ response.setRemark(String.format("the broker[%s] pop message's num is greater than 32",
+ this.brokerController.getBrokerConfig().getBrokerIP1()));
+ return response;
+ }
+
+ if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false",
this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index a63cd2930..116cb0f82 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -600,6 +600,11 @@ public class PopReviveService extends ServiceThread {
continue;
}
+ if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
+ POP_LOGGER.warn("skip revive topic because timerWheelEnable is false");
+ continue;
+ }
+
POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId);
ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
consumeReviveMessage(consumeReviveObj);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index aabc68220..44f04066c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -91,6 +91,7 @@ public class PopMessageProcessorTest {
@Test
public void testProcessRequest_TopicNotExist() throws RemotingCommandException {
+ when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
final RemotingCommand request = createPopMsgCommand();
RemotingCommand response = popMessageProcessor.processRequest(handlerContext, request);
@@ -102,6 +103,7 @@ public class PopMessageProcessorTest {
@Test
public void testProcessRequest_Found() throws RemotingCommandException, InterruptedException {
GetMessageResult getMessageResult = createGetMessageResult(1);
+ when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
final RemotingCommand request = createPopMsgCommand();
@@ -115,6 +117,7 @@ public class PopMessageProcessorTest {
public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult(1);
getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+ when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
final RemotingCommand request = createPopMsgCommand();
@@ -128,6 +131,7 @@ public class PopMessageProcessorTest {
public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult(0);
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
+ when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
final RemotingCommand request = createPopMsgCommand();
@@ -135,6 +139,17 @@ public class PopMessageProcessorTest {
assertThat(response).isNull();
}
+ @Test
+ public void testProcessRequest_whenTimerWheelIsFalse() throws RemotingCommandException {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setTimerWheelEnable(false);
+ when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ final RemotingCommand request = createPopMsgCommand();
+ RemotingCommand response = popMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ assertThat(response.getRemark()).contains("pop message is forbidden because timerWheelEnable is false");
+ }
private RemotingCommand createPopMsgCommand() {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
@@ -152,7 +167,6 @@ public class PopMessageProcessorTest {
return request;
}
-
private GetMessageResult createGetMessageResult(int msgCnt) {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 686265292..9132761a6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -116,57 +116,59 @@ public class DefaultStoreMetricsManager {
measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build());
});
- timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
- .setDescription("Timer enqueue messages lag")
- .ofLongs()
- .buildWithCallback(measurement -> {
- TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
- measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
- });
-
- timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
- .setDescription("Timer enqueue latency")
- .setUnit("milliseconds")
- .ofLongs()
- .buildWithCallback(measurement -> {
- TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
- measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
- });
- timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
- .setDescription("Timer dequeue messages lag")
- .ofLongs()
- .buildWithCallback(measurement -> {
- TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
- measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
- });
- timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
- .setDescription("Timer dequeue latency")
- .setUnit("milliseconds")
- .ofLongs()
- .buildWithCallback(measurement -> {
- TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
- measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
- });
- timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
- .setDescription("Current message number in timing")
- .ofLongs()
- .buildWithCallback(measurement -> {
- TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
- timerMessageStore.getTimerMetrics()
- .getTimingCount()
- .forEach((topic, metric) -> {
- measurement.record(
- metric.getCount().get(),
- newAttributesBuilder().put(LABEL_TOPIC, topic).build()
- );
- });
- });
- timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
- .setDescription("Total number of timer dequeue")
- .build();
- timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
- .setDescription("Total number of timer enqueue")
- .build();
+ if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
+ timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+ .setDescription("Timer enqueue messages lag")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
+ });
+
+ timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+ .setDescription("Timer enqueue latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
+ });
+ timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+ .setDescription("Timer dequeue messages lag")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
+ });
+ timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+ .setDescription("Timer dequeue latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
+ });
+ timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+ .setDescription("Current message number in timing")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+ timerMessageStore.getTimerMetrics()
+ .getTimingCount()
+ .forEach((topic, metric) -> {
+ measurement.record(
+ metric.getCount().get(),
+ newAttributesBuilder().put(LABEL_TOPIC, topic).build()
+ );
+ });
+ });
+ timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+ .setDescription("Total number of timer dequeue")
+ .build();
+ timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+ .setDescription("Total number of timer enqueue")
+ .build();
+ }
}
public static void incTimerDequeueCount(String topic) {