You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/07/17 06:35:20 UTC

[rocketmq] branch litePullConsumer created (now 11b686e)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a change to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


      at 11b686e  Add lite pull consumer example

This branch includes the following new commits:

     new 2aae40f  Add pull schedual service
     new af04557  Add pull task logic
     new 11b686e  Add lite pull consumer example

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq] 01/03: Add pull schedual service

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 2aae40f9256cb0a6a2b628f00df7dbe9718a963d
Author: duhenglucky <du...@gmail.com>
AuthorDate: Thu Jun 20 23:32:33 2019 +0800

    Add pull schedual service
---
 .../consumer/MQPullConsumerScheduleService.java    |   2 +-
 .../client/impl/consumer/AssignedMessageQueue.java | 157 +++++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |   6 +-
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 255 +++++++++++++++++++++
 4 files changed, 416 insertions(+), 4 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index 44b864e..685f4c8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -151,7 +151,7 @@ public class MQPullConsumerScheduleService {
         }
     }
 
-    class PullTaskImpl implements Runnable {
+    public class PullTaskImpl implements Runnable {
         private final MessageQueue messageQueue;
         private volatile boolean cancelled = false;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
new file mode 100644
index 0000000..e9623a8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class AssignedMessageQueue {
+
+    private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState;
+
+    public AssignedMessageQueue() {
+        assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>();
+    }
+
+    public boolean isPaused(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.isPaused();
+        }
+        return false;
+    }
+
+    public void pause(Collection<MessageQueue> messageQueues) {
+        for (MessageQueue messageQueue : messageQueues) {
+            MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+            if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.setPaused(true);
+            }
+        }
+    }
+
+    public void resume(Collection<MessageQueue> messageQueueCollection) {
+        for (MessageQueue messageQueue : messageQueueCollection) {
+            MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+            if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.setPaused(false);
+            }
+        }
+    }
+
+    public long getNextOffset(MessageQueue messageQueue) throws MQClientException {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (assignedMessageQueueState.get(messageQueue) != null) {
+            return messageQueueStat.getNextOffset();
+        }
+        return -1;
+    }
+
+    public void updateNextOffset(MessageQueue messageQueue, long offset) throws MQClientException {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueue == null) {
+            messageQueueStat = new MessageQueueStat(messageQueue, offset);
+            assignedMessageQueueState.putIfAbsent(messageQueue, messageQueueStat);
+        }
+        assignedMessageQueueState.get(messageQueue).setNextOffset(offset);
+    }
+
+    public void updateAssignedMessageQueue(Set<MessageQueue> assigned) {
+        synchronized (this.assignedMessageQueueState) {
+            Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+                if (!assigned.contains(next.getKey())) {
+                    it.remove();
+                }
+            }
+
+            for (MessageQueue messageQueue : assigned) {
+                if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
+                    MessageQueueStat messageQueueStat = new MessageQueueStat(messageQueue);
+                    this.assignedMessageQueueState.put(messageQueue, messageQueueStat);
+                }
+            }
+        }
+    }
+
+    public void removeAssignedMessageQueue(String topic) {
+        synchronized (this.assignedMessageQueueState) {
+            Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+                if (next.getKey().getTopic().equals(topic)) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public Map<MessageQueue, Long> getNeedCommitOffsets() {
+        Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
+        Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = this.assignedMessageQueueState.entrySet();
+        for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
+            map.put(entry.getKey(), entry.getValue().getNextOffset());
+        }
+        return map;
+    }
+
+    public class MessageQueueStat {
+        private MessageQueue messageQueue;
+        private boolean paused = false;
+        private long nextOffset = -1;
+
+        public MessageQueueStat(MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        public MessageQueueStat(MessageQueue messageQueue, long nextOffset) {
+            this.messageQueue = messageQueue;
+            this.nextOffset = nextOffset;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public void setMessageQueue(MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        public boolean isPaused() {
+            return paused;
+        }
+
+        public void setPaused(boolean paused) {
+            this.paused = paused;
+        }
+
+        public long getNextOffset() {
+            return nextOffset;
+        }
+
+        public void setNextOffset(long nextOffset) {
+            this.nextOffset = nextOffset;
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 8aff14b..bc0884a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -68,16 +68,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQPullConsumer defaultMQPullConsumer;
+    protected final DefaultMQPullConsumer defaultMQPullConsumer;
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
     private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mQClientFactory;
+    protected MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
-    private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
+    protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
 
     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
         this.defaultMQPullConsumer = defaultMQPullConsumer;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
new file mode 100644
index 0000000..7332818
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
+    private final InternalLogger log = ClientLogger.getLog();
+
+    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
+        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+
+    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
+
+    private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
+
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
+    ;
+
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
+        super(defaultMQPullConsumer, rpcHook);
+    }
+
+    public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
+        this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
+        updatePullTask(topic, assignedMessageQueue);
+    }
+
+    public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
+        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                if (!mqNewSet.contains(next.getKey())) {
+                    next.getValue().setCancelled(true);
+                    it.remove();
+                }
+            }
+        }
+
+        for (MessageQueue messageQueue : mqNewSet) {
+            if (!this.taskTable.containsKey(messageQueue)) {
+                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+                this.taskTable.put(messageQueue, pullTask);
+                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+            MessageModel messageModel = defaultMQPullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    updateAssignedMessageQueue(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    updateAssignedMessageQueue(topic, mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    int nextPullBatchNums() {
+        return Math.min(10, consumeRequestCache.remainingCapacity());
+    }
+
+    @Override
+    public synchronized void start() throws MQClientException {
+        super.start();
+        final String group = this.defaultMQPullConsumer.getConsumerGroup();
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+            10, //this.pullThreadNums,
+            new ThreadFactoryImpl("PullMsgThread-" + group)
+        );
+        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+    }
+
+    public void subscribe(String topic, String subExpression) throws MQClientException {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
+                topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            if (this.mQClientFactory != null) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        try {
+            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+        } catch (MQClientException e) {
+            log.error("A error occurred in update consume: {} offset process.", remoteQueue, e);
+        }
+    }
+
+    private void addToConsumed(ConsumeRequest consumeRequest) {
+        synchronized (this.allConsumed) {
+            allConsumed.add(consumeRequest);
+        }
+    }
+
+    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+            addToConsumed(consumeRequest);
+        } catch (InterruptedException ex) {
+            log.error("Submit consumeRequest error", ex);
+        }
+    }
+
+    long nextPullOffset(MessageQueue remoteQueue) {
+        long offset = -1;
+        try {
+            offset = assignedMessageQueue.getNextOffset(remoteQueue);
+            if (offset == -1) {
+                offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
+                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+            }
+        } catch (MQClientException e) {
+            log.error("An error occurred in fetch consume offset process.", e);
+        }
+        return offset;
+    }
+
+    public class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+        private volatile boolean cancelled = false;
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        @Override
+        public void run() {
+            String topic = this.messageQueue.getTopic();
+            if (!this.isCancelled()) {
+                if (assignedMessageQueue.isPaused(messageQueue)) {
+                    log.debug("Message Queue: {} has been paused!", messageQueue);
+                    return;
+                }
+                SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
+                long offset = nextPullOffset(messageQueue);
+                try {
+                    PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
+                    ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            if (processQueue != null) {
+                                processQueue.putMessage(pullResult.getMsgFoundList());
+                                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
+                } catch (Exception e) {
+                    log.error("An error occurred in pull message process.", e);
+                }
+            }
+        }
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+    }
+
+    public class ConsumeRequest {
+        private final List<MessageExt> messageExts;
+        private final MessageQueue messageQueue;
+        private final ProcessQueue processQueue;
+        private long startConsumeTimeMillis;
+
+        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
+            final ProcessQueue processQueue) {
+            this.messageExts = messageExts;
+            this.messageQueue = messageQueue;
+            this.processQueue = processQueue;
+        }
+
+        public List<MessageExt> getMessageExts() {
+            return messageExts;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public ProcessQueue getProcessQueue() {
+            return processQueue;
+        }
+
+        public long getStartConsumeTimeMillis() {
+            return startConsumeTimeMillis;
+        }
+
+        public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+            this.startConsumeTimeMillis = startConsumeTimeMillis;
+        }
+    }
+}


[rocketmq] 03/03: Add lite pull consumer example

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 11b686e47797be15f187f906e4064264dfa2c993
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 14:33:29 2019 +0800

    Add lite pull consumer example
---
 .../client/consumer/DefaultLiteMQPullConsumer.java | 82 ++++++++++++++++++++--
 .../client/consumer/LiteMQPullConsumer.java        |  3 -
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 77 +++++++++++++++++---
 .../example/simple/LitePullConsumerTest.java       | 49 +++++++++++++
 4 files changed, 192 insertions(+), 19 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
index 99fd0d9..96d4f5a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -27,18 +27,50 @@ import org.apache.rocketmq.remoting.RPCHook;
 public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
     private LiteMQPullConsumerImpl liteMQPullConsumer;
 
+    /**
+     * Maximum amount of time in minutes a message may block the consuming thread.
+     */
+    private long consumeTimeout = 15;
+
+    /**
+     * Is auto commit offset
+     */
+    private boolean autoCommit = true;
+
+    private int pullThreadNumbers = 20;
+
+    /**
+     * Maximum commit offset interval time in seconds.
+     */
+    private long autoCommitInterval = 20;
+
     public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+        this.setConsumerGroup(consumerGroup);
         this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
     }
 
-    @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+    public DefaultLiteMQPullConsumer(String consumerGroup) {
+        this.setConsumerGroup(consumerGroup);
+        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
+    }
+
+    @Override
+    public void start() throws MQClientException{
+        this.liteMQPullConsumer.start();
+    }
+
+    @Override
+    public void subscribe(String topic, String subExpression) throws MQClientException {
         this.liteMQPullConsumer.subscribe(topic, subExpression);
     }
 
-    @Override public void unsubscribe(String topic) {
+    @Override
+    public void unsubscribe(String topic) {
+        this.liteMQPullConsumer.unsubscribe(topic);
     }
 
-    @Override public List<MessageExt> poll() {
+    @Override
+    public List<MessageExt> poll() {
         return poll(this.getConsumerPullTimeoutMillis());
     }
 
@@ -46,19 +78,55 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
         return liteMQPullConsumer.poll(timeout);
     }
 
-    @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+    @Override
+    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        this.liteMQPullConsumer.seek(messageQueue, offset);
+    }
+
+    @Override
+    public void pause(Collection<MessageQueue> messageQueues) {
+        this.liteMQPullConsumer.pause(messageQueues);
+    }
+
+    @Override
+    public void resume(Collection<MessageQueue> messageQueues) {
+        this.liteMQPullConsumer.resume(messageQueues);
+    }
+
+    @Override
+    public void commitSync() {
+        this.liteMQPullConsumer.commit();
+    }
 
+    public long getConsumeTimeout() {
+        return consumeTimeout;
     }
 
-    @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+    public void setConsumeTimeout(long consumeTimeout) {
+        this.consumeTimeout = consumeTimeout;
+    }
 
+    public boolean isAutoCommit() {
+        return autoCommit;
     }
 
-    @Override public void resume(Collection<MessageQueue> partitions) {
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
 
+    public int getPullThreadNumbers() {
+        return pullThreadNumbers;
     }
 
-    @Override public void commitSync() {
+    public void setPullThreadNumbers(int pullThreadNumbers) {
+        this.pullThreadNumbers = pullThreadNumbers;
+    }
+
+    public long getAutoCommitInterval() {
+        return autoCommitInterval;
+    }
 
+    public void setAutoCommitInterval(long autoCommitInterval) {
+        this.autoCommitInterval = autoCommitInterval;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
index 223cca0..da8d1cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -38,9 +38,6 @@ public interface LiteMQPullConsumer {
      */
     void unsubscribe(final String topic);
 
-    /**
-     * @return
-     */
     List<MessageExt> poll();
 
     List<MessageExt> poll(long timeout);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index abf5f47..d612286 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -27,12 +27,14 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,6 +52,8 @@ import org.apache.rocketmq.remoting.RPCHook;
 public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
     private final InternalLogger log = ClientLogger.getLog();
 
+    private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
+
     private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
         new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
 
@@ -58,12 +62,21 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
     private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
 
     private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
-    ;
+
+    private final ScheduledExecutorService cleanExpireMsgExecutors;
 
     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
-    public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
+    private ScheduledExecutorService autoCommitExecutors;
+
+    public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
         super(defaultMQPullConsumer, rpcHook);
+        this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
+        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "Lite_CleanExpireMsgScheduledThread_"));
+        this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "Lite_AutoCommitScheduledThread_"));
+
     }
 
     public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
@@ -115,18 +128,43 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
 
     @Override
     public synchronized void start() throws MQClientException {
+        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
         super.start();
         final String group = this.defaultMQPullConsumer.getConsumerGroup();
         this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-            10, //this.pullThreadNums,
+            this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
             new ThreadFactoryImpl("PullMsgThread-" + group)
         );
-        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                cleanExpireMsg();
+            }
+        }, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
+        this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                if (defaultLiteMQPullConsumer.isAutoCommit()) {
+                    commit();
+                }
+            }
+        }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
+        updateTopicSubscribeInfoWhenSubscriptionChanged();
+    }
+
+    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
+        if (subTable != null) {
+            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+                final String topic = entry.getKey();
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            }
+        }
     }
 
     public List<MessageExt> poll(long timeout) {
         try {
-            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS);
             if (consumeRequest != null) {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 for (MessageExt messageExt : messages) {
@@ -148,6 +186,16 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         assignedMessageQueue.resume(messageQueues);
     }
 
+    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+        this.updatePullOffset(messageQueue, offset);
+        try {
+            updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException ex) {
+            log.error("Seek offset to remote message queue error!", ex);
+            throw ex;
+        }
+    }
+
     public void unsubscribe(final String topic) {
         unsubscribe(topic);
         removePullTaskCallback(topic);
@@ -270,7 +318,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
                         if (!msgTreeMap.isEmpty()) {
                             msg = msgTreeMap.firstEntry().getValue();
                             if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
-                                > 10 * 60 * 1000) {
+                                > this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
                                 //Expired, ack and remove it.
                             } else {
                                 break;
@@ -316,16 +364,19 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
 
         @Override
         public void run() {
+            System.out.println("begin pull message");
             String topic = this.messageQueue.getTopic();
             if (!this.isCancelled()) {
                 if (assignedMessageQueue.isPaused(messageQueue)) {
+                    scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS);
                     log.debug("Message Queue: {} has been paused!", messageQueue);
                     return;
                 }
                 SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                 long offset = nextPullOffset(messageQueue);
+                long pullDelayTimeMills = 0;
                 try {
-                    PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
+                    PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
                     ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
@@ -338,9 +389,17 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
                             break;
                     }
                     updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
-                } catch (Exception e) {
+                } catch (Throwable e) {
+                    pullDelayTimeMills = 1000;
+                    e.printStackTrace();
                     log.error("An error occurred in pull message process.", e);
                 }
+
+                if (!this.isCancelled()) {
+                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
+                } else {
+                    log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+                }
             }
         }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
new file mode 100644
index 0000000..4297e4f
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class LitePullConsumerTest {
+    public static void main(String[] args) throws Exception {
+        DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
+        litePullConsumer.subscribe("test", null);
+        litePullConsumer.start();
+        MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1);
+        int i = 0;
+        while (true) {
+            List<MessageExt> messageExts = litePullConsumer.poll();
+            System.out.println("-----------");
+            System.out.println(messageExts);
+            System.out.println("-----------");
+            i++;
+            if (i == 3) {
+                System.out.println("pause");
+                litePullConsumer.pause(Arrays.asList(messageQueue));
+            }
+            if (i == 10) {
+                System.out.println("resume");
+                litePullConsumer.resume(Arrays.asList(messageQueue));
+            }
+            litePullConsumer.commitSync();
+        }
+    }
+}


[rocketmq] 02/03: Add pull task logic

Posted by du...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit af04557abf12ce36245d7e362925125cd25e0d3c
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Jun 21 09:47:42 2019 +0800

    Add pull task logic
---
 .../client/consumer/DefaultLiteMQPullConsumer.java |  64 ++++++++++
 .../client/consumer/DefaultMQPullConsumer.java     |   2 +
 .../client/consumer/LiteMQPullConsumer.java        |  55 ++++++++
 .../rocketmq/client/consumer/MQPullConsumer.java   |   3 +
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 140 ++++++++++++++++++++-
 5 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
new file mode 100644
index 0000000..99fd0d9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
+    private LiteMQPullConsumerImpl liteMQPullConsumer;
+
+    public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
+        this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
+    }
+
+    @Override public void subscribe(String topic, String subExpression) throws MQClientException{
+        this.liteMQPullConsumer.subscribe(topic, subExpression);
+    }
+
+    @Override public void unsubscribe(String topic) {
+    }
+
+    @Override public List<MessageExt> poll() {
+        return poll(this.getConsumerPullTimeoutMillis());
+    }
+
+    @Override public List<MessageExt> poll(long timeout) {
+        return liteMQPullConsumer.poll(timeout);
+    }
+
+    @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+
+    }
+
+    @Override public void pause(Collection<MessageQueue> messageQueueCollection) {
+
+    }
+
+    @Override public void resume(Collection<MessageQueue> partitions) {
+
+    }
+
+    @Override public void commitSync() {
+
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index f3b6caa..dbf37d2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
new file mode 100644
index 0000000..223cca0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public interface LiteMQPullConsumer {
+    /**
+     * Subscribe some topic
+     *
+     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
+     * null or * expression,meaning subscribe all
+     */
+    void subscribe(final String topic, final String subExpression) throws MQClientException;
+
+    /**
+     * Unsubscribe consumption some topic
+     *
+     * @param topic message topic
+     */
+    void unsubscribe(final String topic);
+
+    /**
+     * @return
+     */
+    List<MessageExt> poll();
+
+    List<MessageExt> poll(long timeout);
+
+    void seek(MessageQueue messageQueue, long offset) throws MQClientException;
+
+    void pause(Collection<MessageQueue> messageQueueCollection);
+
+    void resume(Collection<MessageQueue> partitions);
+
+    void commitSync();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 28b807c..9c7cb36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -169,4 +171,5 @@ public interface MQPullConsumer extends MQConsumer {
      */
     void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index 7332818..abf5f47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -17,16 +17,21 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -34,6 +39,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -118,6 +124,81 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
     }
 
+    public List<MessageExt> poll(long timeout) {
+        try {
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            if (consumeRequest != null) {
+                List<MessageExt> messages = consumeRequest.getMessageExts();
+                for (MessageExt messageExt : messages) {
+                    MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+                }
+                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+                return messages;
+            }
+        } catch (InterruptedException ignore) {
+        }
+        return null;
+    }
+
+    public void pause(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.pause(messageQueues);
+    }
+
+    public void resume(Collection<MessageQueue> messageQueues) {
+        assignedMessageQueue.resume(messageQueues);
+    }
+
+    public void unsubscribe(final String topic) {
+        unsubscribe(topic);
+        removePullTaskCallback(topic);
+        assignedMessageQueue.removeAssignedMessageQueue(topic);
+    }
+
+    public void removePullTaskCallback(final String topic) {
+        removePullTask(topic);
+    }
+
+    public void removePullTask(final String topic) {
+        synchronized (this.taskTable) {
+            Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+                if (next.getKey().getTopic().equals(topic)) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public void commit() {
+        List<ConsumeRequest> consumeRequests;
+        synchronized (this.allConsumed) {
+            consumeRequests = this.allConsumed;
+            this.allConsumed = new ArrayList<ConsumeRequest>();
+        }
+        for (ConsumeRequest consumeRequest : consumeRequests) {
+            consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
+        }
+        Set<Map.Entry<MessageQueue, Long>> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet();
+        for (Map.Entry<MessageQueue, Long> entry : entrySet) {
+            try {
+                updateConsumeOffset(entry.getKey(), entry.getValue());
+            } catch (MQClientException e) {
+                log.error("A error occurred in update consume offset process.", e);
+            }
+        }
+        this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
+    }
+
+    private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
+        long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
+        try {
+            updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException e) {
+            log.error("An error occurred in update consume offset process.", e);
+        }
+    }
+
     public void subscribe(String topic, String subExpression) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
@@ -159,7 +240,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         try {
             offset = assignedMessageQueue.getNextOffset(remoteQueue);
             if (offset == -1) {
-                offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
+                offset = fetchConsumeOffset(remoteQueue, false);
                 assignedMessageQueue.updateNextOffset(remoteQueue, offset);
             }
         } catch (MQClientException e) {
@@ -168,6 +249,63 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
         return offset;
     }
 
+    private void cleanExpireMsg() {
+        for (final Map.Entry<MessageQueue, ProcessQueue> next : rebalanceImpl.getProcessQueueTable().entrySet()) {
+            ProcessQueue pq = next.getValue();
+            MessageQueue mq = next.getKey();
+            ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+            if (lockTreeMap == null) {
+                log.error("Gets tree map lock in process queue error of message queue:", mq);
+                return;
+            }
+
+            TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
+
+            int loop = msgTreeMap.size();
+            for (int i = 0; i < loop; i++) {
+                MessageExt msg = null;
+                try {
+                    lockTreeMap.readLock().lockInterruptibly();
+                    try {
+                        if (!msgTreeMap.isEmpty()) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                            if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+                                > 10 * 60 * 1000) {
+                                //Expired, ack and remove it.
+                            } else {
+                                break;
+                            }
+                        } else {
+                            break;
+                        }
+                    } finally {
+                        lockTreeMap.readLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Gets expired message exception", e);
+                }
+
+                try {
+                    this.defaultMQPullConsumer.sendMessageBack(msg, 3);
+                    log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
+                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+                    System.out.println("Send expired msg back.");
+                    commit(mq, pq, msg);
+                } catch (Exception e) {
+                    log.error("Send back expired msg exception", e);
+                }
+            }
+        }
+    }
+
+    private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+        try {
+            return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+    }
+
     public class PullTaskImpl implements Runnable {
         private final MessageQueue messageQueue;
         private volatile boolean cancelled = false;