You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/04 10:39:49 UTC
[rocketmq] branch develop updated: [ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fe59775e3 [ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)
fe59775e3 is described below
commit fe59775e377defcc515594d0ea69b56cf568f0c0
Author: rongtong <ji...@163.com>
AuthorDate: Wed Jan 4 18:39:29 2023 +0800
[ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)
* Fix the issue that static topic cannot be consumed normally
* Fix the issue that static topic cannot be consumed normally
---
.../rocketmq/broker/plugin/PullMessageResultHandler.java | 4 +++-
.../broker/processor/DefaultPullMessageResultHandler.java | 4 ++--
.../rocketmq/broker/processor/PullMessageProcessor.java | 13 +++++++++----
.../java/org/apache/rocketmq/tools/admin/MQAdminUtils.java | 4 ++--
4 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
index d75642381..0b9f4295c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
@@ -49,5 +50,6 @@ public interface PullMessageResultHandler {
final SubscriptionGroupConfig subscriptionGroupConfig,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
- final RemotingCommand response);
+ final RemotingCommand response,
+ final TopicQueueMappingContext mappingContext);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index da99cf2a4..591b22d23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -83,7 +83,8 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
final SubscriptionGroupConfig subscriptionGroupConfig,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
- RemotingCommand response) {
+ RemotingCommand response,
+ TopicQueueMappingContext mappingContext) {
PullMessageProcessor processor = brokerController.getPullMessageProcessor();
final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -98,7 +99,6 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
}
//rewrite the response for the static topic
- TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
RemotingCommand rewriteResult = processor.rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
if (rewriteResult != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index a300c5b7a..c15f8b323 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -145,7 +145,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
- return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ LOGGER.warn("", t);
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.toString());
}
}
@@ -274,7 +275,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return null;
}
} catch (Throwable t) {
- return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ LOGGER.warn("", t);
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.toString());
}
}
@@ -523,7 +525,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
- finalResponse
+ finalResponse,
+ mappingContext
);
})
.thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
@@ -531,6 +534,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
if (getMessageResult != null) {
+
return this.pullMessageResultHandler.handle(
getMessageResult,
request,
@@ -540,7 +544,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionGroupConfig,
brokerAllowSuspend,
messageFilter,
- response
+ response,
+ mappingContext
);
}
return null;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index c70c45d1b..a5162cddb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -148,13 +148,13 @@ public class MQAdminUtils {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
// now do the remapping
- //Step1: let the new leader can be write without the logicOffset
+ //Step1: let the new leader can be written without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
- //Step2: forbid the write of old leader
+ //Step2: forbid to write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);