You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/14 02:39:43 UTC

[rocketmq] branch litePullConsumer updated: Polish lite pull consumer (#1381)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new c6096a1  Polish lite pull consumer (#1381)
c6096a1 is described below

commit c6096a1443f4ee53472b5a93ec68b024fd61f311
Author: King <79...@qq.com>
AuthorDate: Wed Aug 14 10:39:36 2019 +0800

    Polish lite pull consumer (#1381)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
    
    * polish LiteMQPullConsumer
    
    * add flow control and polish commit logic
    
    * fix bug
    
    * polish code
    
    * fix commit consumed offset back
    
    * refactor litePullConsumer
    
    * development save
    
    * development save
    
    * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
    
    * Polish lite pull consumer
    
    * polish lite pull consumer
    
    * polish lite pull consumer
    
    * fix seek
    
    * fix seek function
    
    * polish lite pull consumer
    
    * add apache header
    
    * add test
    
    * polish test
    
    * Make broadcast model work for litePullConsumer
    
    * Revert example/broadcast/PushConsumer.java
    
    * Add delay time when no new message
    
    * Enable long polling mode
    
    * Fix subscribe bug when rebalance
    
    * Delete useless consumeMessageHook
    
    * Implement TopicMessageQueueChangeListener interface
    
    * Lite pull consumer support namespace
    
    * Make sql92 filter work
---
 .../org/apache/rocketmq/client/ClientConfig.java   |  18 +-
 .../client/consumer/DefaultLitePullConsumer.java   |  25 ++-
 .../rocketmq/client/consumer/LitePullConsumer.java |   9 +-
 .../consumer/TopicMessageQueueChangeListener.java  |  30 +++
 .../client/impl/consumer/AssignedMessageQueue.java |   2 -
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 232 +++++++++++++--------
 6 files changed, 212 insertions(+), 104 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 87c01a5..c3e4efa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -16,7 +16,9 @@
  */
 package org.apache.rocketmq.client;
 
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.UtilAll;
@@ -95,7 +97,6 @@ public class ClientConfig {
         }
     }
 
-
     public String withNamespace(String resource) {
         return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
     }
@@ -124,9 +125,21 @@ public class ClientConfig {
         if (StringUtils.isEmpty(this.getNamespace())) {
             return queue;
         }
-
         return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
     }
+
+    public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
+        if (StringUtils.isEmpty(this.getNamespace())) {
+            return queues;
+        }
+        Iterator<MessageQueue> iter = queues.iterator();
+        while (iter.hasNext()) {
+            MessageQueue queue = iter.next();
+            queue.setTopic(withNamespace(queue.getTopic()));
+        }
+        return queues;
+    }
+
     public void resetClientConfig(final ClientConfig cc) {
         this.namesrvAddr = cc.namesrvAddr;
         this.clientIP = cc.clientIP;
@@ -170,6 +183,7 @@ public class ClientConfig {
 
     /**
      * Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
+     *
      * @param namesrvAddr name server address
      */
     public void setNamesrvAddr(String namesrvAddr) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 1858fa1..543e9cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -176,17 +176,22 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
 
     @Override
     public void subscribe(String topic, String subExpression) throws MQClientException {
-        this.defaultLitePullConsumerImpl.subscribe(topic, subExpression);
+        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
+    }
+
+    @Override
+    public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
+        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
     }
 
     @Override
     public void unsubscribe(String topic) {
-        this.defaultLitePullConsumerImpl.unsubscribe(topic);
+        this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
     }
 
     @Override
     public void assign(Collection<MessageQueue> messageQueues) {
-        defaultLitePullConsumerImpl.assign(messageQueues);
+        defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
     }
 
     @Override
@@ -201,17 +206,17 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
 
     @Override
     public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
-        this.defaultLitePullConsumerImpl.seek(messageQueue, offset);
+        this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
     }
 
     @Override
     public void pause(Collection<MessageQueue> messageQueues) {
-        this.defaultLitePullConsumerImpl.pause(messageQueues);
+        this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
     }
 
     @Override
     public void resume(Collection<MessageQueue> messageQueues) {
-        this.defaultLitePullConsumerImpl.resume(messageQueues);
+        this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
     }
 
     @Override
@@ -221,7 +226,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
 
     @Override
     public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
-        return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
+        return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
+    }
+
+    public void registerTopicMessageQueueChangeListener(String topic,
+        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
+        this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
     }
 
     @Override
@@ -390,5 +400,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public void setPullDelayTimeMills(long pullDelayTimeMills) {
         this.pullDelayTimeMills = pullDelayTimeMills;
     }
-
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index ece08af..87b9dd3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -36,7 +36,7 @@ public interface LitePullConsumer {
     void shutdown();
 
     /**
-     * Subscribe some topic
+     * Subscribe some topic with subExpression
      *
      * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
      *                      null or * expression,meaning subscribe all
@@ -44,6 +44,13 @@ public interface LitePullConsumer {
     void subscribe(final String topic, final String subExpression) throws MQClientException;
 
     /**
+     * Subscribe some topic with selector.
+     *
+     * @param selector message selector({@link MessageSelector}), can be null.
+     */
+    void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+
+    /**
      * Unsubscribe consumption some topic
      *
      * @param topic message topic
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
new file mode 100644
index 0000000..fa6fd13
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface TopicMessageQueueChangeListener {
+    /**
+     * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
+     * expanded or shrunk.
+     *
+     * @param messageQueues
+     */
+    void onChanged(String topic, Set<MessageQueue> messageQueues);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index b21fd01..8dcaa30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -138,7 +138,6 @@ public class AssignedMessageQueue {
                 Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
                 if (next.getKey().getTopic().equals(topic)) {
                     if (!assigned.contains(next.getKey())) {
-                        System.out.printf("MessageQueue-%s is removed %n", next.getKey());
                         next.getValue().getProcessQueue().setDropped(true);
                         it.remove();
                     }
@@ -167,7 +166,6 @@ public class AssignedMessageQueue {
             if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
                 MessageQueueStat messageQueueStat;
                 if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
-                    System.out.printf("MessageQueue-%s is added %n", messageQueue);
                     messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
                 } else {
                     ProcessQueue processQueue = new ProcessQueue();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 07ef1cf..e17aae6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -28,12 +29,16 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -127,6 +132,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
+
+    private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
+
     private long consumeRequestFlowControlTimes = 0L;
 
     private long queueFlowControlTimes = 0L;
@@ -138,6 +149,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
         this.rpcHook = rpcHook;
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+            this.defaultLitePullConsumer.getPullThreadNumbers(),
+            new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
+        );
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "MonitorMessageQueueChangeThread");
+            }
+        });
     }
 
     private void checkServiceState() {
@@ -266,12 +287,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
                 mQClientFactory.start();
 
-                final String group = this.defaultLitePullConsumer.getConsumerGroup();
-
-                this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-                    this.defaultLitePullConsumer.getPullThreadNumbers(),
-                    new ThreadFactoryImpl("PullMsgThread-" + group)
-                );
                 if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                     updateTopicSubscribeInfoWhenSubscriptionChanged();
                 }
@@ -279,8 +294,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     updateAssignPullTask(assignedMessageQueue.messageQueues());
                 }
 
+                scheduledExecutorService.scheduleAtFixedRate(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                fetchTopicMessageQueuesAndCompare();
+                            } catch (Exception e) {
+                                log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
+                            }
+                        }
+                    }, 1000 * 20, 1000 * 30, TimeUnit.MILLISECONDS);
+
                 log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
                 this.serviceState = ServiceState.RUNNING;
+                for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
+                    Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
+                    messageQueuesForTopic.put(topic, messageQueues);
+                }
+                this.mQClientFactory.checkClientInBroker();
                 break;
             case RUNNING:
             case START_FAILED:
@@ -339,7 +371,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
-
     }
 
     public PullAPIWrapper getPullAPIWrapper() {
@@ -353,7 +384,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     break;
                 case CLUSTERING:
                     /*
-                     * Retry topic support in the future.
+                     * Retry topic will be support in the future.
                      */
                     break;
                 default:
@@ -410,7 +441,28 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 updateTopicSubscribeInfoWhenSubscriptionChanged();
             }
         } catch (Exception e) {
-            throw new MQClientException("subscription exception", e);
+            throw new MQClientException("subscribe exception", e);
+        }
+    }
+
+    public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
+        try {
+            setSubscriptionType(SubscriptionType.SUBSCRIBE);
+            if (messageSelector == null) {
+                subscribe(topic, SubscriptionData.SUB_ALL);
+                return;
+            }
+            SubscriptionData subscriptionData = FilterAPI.build(topic,
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+            if (serviceState == ServiceState.RUNNING) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscribe exception", e);
         }
     }
 
@@ -421,6 +473,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     public synchronized void assign(Collection<MessageQueue> messageQueues) {
+        if (messageQueues == null || messageQueues.isEmpty()) {
+            throw new IllegalArgumentException("Message queues can not be null or empty.");
+        }
         setSubscriptionType(SubscriptionType.ASSIGN);
         assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
         if (serviceState == ServiceState.RUNNING) {
@@ -461,6 +516,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                 assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+                //If namespace not null , reset Topic without namespace.
+                this.resetTopic(messages);
                 return messages;
             }
         } catch (InterruptedException ignore) {
@@ -587,8 +644,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     private void submitConsumeRequest(ConsumeRequest consumeRequest) {
         try {
             consumeRequestCache.put(consumeRequest);
-        } catch (InterruptedException ex) {
-            log.error("Submit consumeRequest error", ex);
+        } catch (InterruptedException e) {
+            log.error("Submit consumeRequest error", e);
         }
     }
 
@@ -649,14 +706,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
 
                 if (processQueue == null && processQueue.isDropped()) {
-                    log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+                    log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
                     return;
                 }
 
                 if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((consumeRequestFlowControlTimes++ % 1000) == 0)
-                        log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+                        log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
                     return;
                 }
 
@@ -667,7 +724,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
-                            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                            "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
                             defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                     }
                     return;
@@ -677,7 +734,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
-                            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+                            "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
                             defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                     }
                     return;
@@ -687,21 +744,27 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                         log.warn(
-                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+                            "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
                             processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
                     }
                     return;
                 }
 
-                String subExpression = null;
-                if (subscriptionType == SubscriptionType.SUBSCRIBE) {
-                    String topic = this.messageQueue.getTopic();
-                    subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
-                }
                 long offset = nextPullOffset(messageQueue);
                 long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
                 try {
-                    PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
+
+                    SubscriptionData subscriptionData;
+                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+                        String topic = this.messageQueue.getTopic();
+                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
+                    } else{
+                        String topic = this.messageQueue.getTopic();
+                        subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
+                            topic, SubscriptionData.SUB_ALL);
+                    }
+
+                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchNums());
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
                             if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
@@ -710,7 +773,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                             }
                             break;
                         case OFFSET_ILLEGAL:
-                            log.warn("the pull request offset illegal, {}", pullResult.toString());
+                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                             break;
                         case NO_NEW_MSG:
                             pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
@@ -721,7 +784,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                 } catch (Throwable e) {
                     pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
-                    e.printStackTrace();
                     log.error("An error occurred in pull message process.", e);
                 }
 
@@ -746,58 +808,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
+    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return pull(mq, subExpression, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+        return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
     }
 
-    private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
+    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
         return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
     }
 
-    private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
-        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return pull(mq, messageSelector, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
-    }
-
-    private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
-        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
-        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
-    }
-
-    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
-        throws MQClientException {
-
-        if (null == mq) {
-            throw new MQClientException("mq is null", null);
-        }
-
-        try {
-            return FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
-                mq.getTopic(), subExpression);
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-    }
-
-    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
-        throws MQClientException {
-
-        if (null == mq) {
-            throw new MQClientException("mq is null", null);
-        }
-
-        try {
-            return FilterAPI.build(mq.getTopic(),
-                messageSelector.getExpression(), messageSelector.getExpressionType());
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-    }
-
     private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
         boolean block,
         long timeout)
@@ -815,8 +835,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             throw new MQClientException("maxNums <= 0", null);
         }
 
-        this.subscriptionAutomatically(mq.getTopic());
-
         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
         long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
@@ -837,13 +855,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             null
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
-        //If namespace not null , reset Topic without namespace.
-        this.resetTopic(pullResult.getMsgFoundList());
         return pullResult;
     }
 
-
-    public void resetTopic(List<MessageExt> msgList) {
+    private void resetTopic(List<MessageExt> msgList) {
         if (null == msgList || msgList.size() == 0) {
             return;
         }
@@ -857,17 +872,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     }
 
-    public void subscriptionAutomatically(final String topic) {
-        if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
-            try {
-                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
-                    topic, SubscriptionData.SUB_ALL);
-                this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
-            } catch (Exception ignore) {
-            }
-        }
-    }
-
     public void updateConsumeOffset(MessageQueue mq, long offset) {
         checkServiceState();
         this.offsetStore.updateOffset(mq, offset, false);
@@ -981,13 +985,59 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
         checkServiceState();
-        // check if has info in memory, otherwise invoke api.
-        Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
-        if (null == result) {
-            result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+        Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+        return parseMessageQueues(result);
+    }
+
+    private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
+        for (Map.Entry<String, TopicMessageQueueChangeListener> entry : topicMessageQueueChangeListenerMap.entrySet()) {
+            String topic = entry.getKey();
+            TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
+            Set<MessageQueue> oldMessageQueues = messageQueuesForTopic.get(topic);
+            Set<MessageQueue> newMessageQueues = fetchMessageQueues(topic);
+            boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues);
+            if (isChanged) {
+                messageQueuesForTopic.put(topic, newMessageQueues);
+                if (topicMessageQueueChangeListener != null) {
+                    topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
+                }
+            }
         }
+    }
 
-        return parseMessageQueues(result);
+    private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
+        if (set1 == null && set2 == null) {
+            return true;
+        }
+
+        if (set1 == null || set2 == null || set1.size() != set2.size()
+            || set1.size() == 0 || set2.size() == 0) {
+            return false;
+        }
+
+        Iterator iter = set2.iterator();
+        boolean isEqual = true;
+        while (iter.hasNext()) {
+            if (!set1.contains(iter.next())) {
+                isEqual = false;
+            }
+        }
+        return isEqual;
+    }
+
+    public synchronized void registerTopicMessageQueueChangeListener(String topic,
+        TopicMessageQueueChangeListener listener) throws MQClientException {
+        if (topic == null || listener == null) {
+            throw new MQClientException("Topic or listener is null", null);
+        }
+        if (topicMessageQueueChangeListenerMap.containsKey(topic)) {
+            log.warn("Topic {} had been registered, new listener will overwrite the old one", topic);
+        }
+        topicMessageQueueChangeListenerMap.put(topic, listener);
+        if (this.serviceState == ServiceState.RUNNING) {
+            Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
+            messageQueuesForTopic.put(topic, messageQueues);
+        }
     }
 
     private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {