You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/04 10:39:49 UTC

[rocketmq] branch develop updated: [ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fe59775e3 [ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)
fe59775e3 is described below

commit fe59775e377defcc515594d0ea69b56cf568f0c0
Author: rongtong <ji...@163.com>
AuthorDate: Wed Jan 4 18:39:29 2023 +0800

    [ISSUE #5812] Fix the issue that static topic cannot be consumed normally (#5811)
    
    * Fix the issue that static topic cannot be consumed normally
    
    * Fix the issue that static topic cannot be consumed normally
---
 .../rocketmq/broker/plugin/PullMessageResultHandler.java    |  4 +++-
 .../broker/processor/DefaultPullMessageResultHandler.java   |  4 ++--
 .../rocketmq/broker/processor/PullMessageProcessor.java     | 13 +++++++++----
 .../java/org/apache/rocketmq/tools/admin/MQAdminUtils.java  |  4 ++--
 4 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
index d75642381..0b9f4295c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/PullMessageResultHandler.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageFilter;
@@ -49,5 +50,6 @@ public interface PullMessageResultHandler {
                            final SubscriptionGroupConfig subscriptionGroupConfig,
                            final boolean brokerAllowSuspend,
                            final MessageFilter messageFilter,
-                           final RemotingCommand response);
+                           final RemotingCommand response,
+                           final TopicQueueMappingContext mappingContext);
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index da99cf2a4..591b22d23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -83,7 +83,8 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
         final SubscriptionGroupConfig subscriptionGroupConfig,
         final boolean brokerAllowSuspend,
         final MessageFilter messageFilter,
-        RemotingCommand response) {
+        RemotingCommand response,
+        TopicQueueMappingContext mappingContext) {
         PullMessageProcessor processor = brokerController.getPullMessageProcessor();
         final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -98,7 +99,6 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
         }
 
         //rewrite the response for the static topic
-        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
         final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
         RemotingCommand rewriteResult = processor.rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
         if (rewriteResult != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index a300c5b7a..c15f8b323 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -145,7 +145,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
             }
             return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
         } catch (Throwable t) {
-            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+            LOGGER.warn("", t);
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.toString());
         }
     }
 
@@ -274,7 +275,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 return null;
             }
         } catch (Throwable t) {
-            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+            LOGGER.warn("", t);
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.toString());
         }
     }
 
@@ -523,7 +525,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                             subscriptionGroupConfig,
                             brokerAllowSuspend,
                             messageFilter,
-                            finalResponse
+                            finalResponse,
+                            mappingContext
                         );
                     })
                     .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
@@ -531,6 +534,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         }
 
         if (getMessageResult != null) {
+
             return this.pullMessageResultHandler.handle(
                 getMessageResult,
                 request,
@@ -540,7 +544,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 subscriptionGroupConfig,
                 brokerAllowSuspend,
                 messageFilter,
-                response
+                response,
+                mappingContext
             );
         }
         return null;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index c70c45d1b..a5162cddb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -148,13 +148,13 @@ public class MQAdminUtils {
         ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
         MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
         // now do the remapping
-        //Step1: let the new leader can be write without the logicOffset
+        //Step1: let the new leader can be written without the logicOffset
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
             defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
-        //Step2: forbid the write of old leader
+        //Step2: forbid to write of old leader
         for (String broker: brokersToMapOut) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);