You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2020/02/28 07:43:48 UTC

[rocketmq] branch develop updated: fix(pull_schedual) add the namespace wrapper in callback (#1804)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0e7d271  fix(pull_schedual) add the namespace wrapper in callback (#1804)
0e7d271 is described below

commit 0e7d27120c3262b51029eb0e049fa613eab0c8d0
Author: Heng Du <du...@apache.org>
AuthorDate: Fri Feb 28 15:43:36 2020 +0800

    fix(pull_schedual) add the namespace wrapper in callback (#1804)
---
 .../apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index 4d57313..6a5e714 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -95,7 +96,7 @@ public class MQPullConsumerScheduleService {
     }
 
     public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
-        this.callbackTable.put(topic, callback);
+        this.callbackTable.put(NamespaceUtil.wrapNamespace(this.defaultMQPullConsumer.getNamespace(), topic), callback);
         this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
     }