You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:46 UTC

[29/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
new file mode 100644
index 0000000..efc5ab0
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequest {
+    private String consumerGroup;
+    private MessageQueue messageQueue;
+    private ProcessQueue processQueue;
+    private long nextOffset;
+    private boolean lockedFirst = false;
+
+    public boolean isLockedFirst() {
+        return lockedFirst;
+    }
+
+    public void setLockedFirst(boolean lockedFirst) {
+        this.lockedFirst = lockedFirst;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getNextOffset() {
+        return nextOffset;
+    }
+
+
+    public void setNextOffset(long nextOffset) {
+        this.nextOffset = nextOffset;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
+        result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        PullRequest other = (PullRequest) obj;
+        if (consumerGroup == null) {
+            if (other.consumerGroup != null)
+                return false;
+        } else if (!consumerGroup.equals(other.consumerGroup))
+            return false;
+        if (messageQueue == null) {
+            if (other.messageQueue != null)
+                return false;
+        } else if (!messageQueue.equals(other.messageQueue))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
+                + ", nextOffset=" + nextOffset + "]";
+    }
+
+    public ProcessQueue getProcessQueue() {
+        return processQueue;
+    }
+
+
+    public void setProcessQueue(ProcessQueue processQueue) {
+        this.processQueue = processQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
new file mode 100644
index 0000000..e140b6a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.consumer.PullStatus;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullResultExt extends PullResult {
+    private final long suggestWhichBrokerId;
+    private byte[] messageBinary;
+
+
+    public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
+                         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
+        super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
+        this.suggestWhichBrokerId = suggestWhichBrokerId;
+        this.messageBinary = messageBinary;
+    }
+
+
+    public byte[] getMessageBinary() {
+        return messageBinary;
+    }
+
+
+    public void setMessageBinary(byte[] messageBinary) {
+        this.messageBinary = messageBinary;
+    }
+
+
+    public long getSuggestWhichBrokerId() {
+        return suggestWhichBrokerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
new file mode 100644
index 0000000..641bb75
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import com.alibaba.rocketmq.client.impl.FindBrokerResult;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
+import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Base class for rebalance algorithm
+ *
+ * @author shijia.wxr
+ */
+public abstract class RebalanceImpl {
+    protected static final Logger log = ClientLogger.getLog();
+    protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
+    protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
+            new ConcurrentHashMap<String, Set<MessageQueue>>();
+    protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
+            new ConcurrentHashMap<String, SubscriptionData>();
+    protected String consumerGroup;
+    protected MessageModel messageModel;
+    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
+    protected MQClientInstance mQClientFactory;
+
+
+    public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+                         MQClientInstance mQClientFactory) {
+        this.consumerGroup = consumerGroup;
+        this.messageModel = messageModel;
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        this.mQClientFactory = mQClientFactory;
+    }
+
+    public void unlock(final MessageQueue mq, final boolean oneway) {
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        if (findBrokerResult != null) {
+            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
+            requestBody.setConsumerGroup(this.consumerGroup);
+            requestBody.setClientId(this.mQClientFactory.getClientId());
+            requestBody.getMqSet().add(mq);
+
+            try {
+                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
+                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
+                        this.consumerGroup, //
+                        this.mQClientFactory.getClientId(), //
+                        mq);
+            } catch (Exception e) {
+                log.error("unlockBatchMQ exception, " + mq, e);
+            }
+        }
+    }
+
+    public void unlockAll(final boolean oneway) {
+        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
+
+        for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
+            final String brokerName = entry.getKey();
+            final Set<MessageQueue> mqs = entry.getValue();
+
+            if (mqs.isEmpty())
+                continue;
+
+            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            if (findBrokerResult != null) {
+                UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
+                requestBody.setConsumerGroup(this.consumerGroup);
+                requestBody.setClientId(this.mQClientFactory.getClientId());
+                requestBody.setMqSet(mqs);
+
+                try {
+                    this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
+
+                    for (MessageQueue mq : mqs) {
+                        ProcessQueue processQueue = this.processQueueTable.get(mq);
+                        if (processQueue != null) {
+                            processQueue.setLocked(false);
+                            log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("unlockBatchMQ exception, " + mqs, e);
+                }
+            }
+        }
+    }
+
+    private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
+        HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
+        for (MessageQueue mq : this.processQueueTable.keySet()) {
+            Set<MessageQueue> mqs = result.get(mq.getBrokerName());
+            if (null == mqs) {
+                mqs = new HashSet<MessageQueue>();
+                result.put(mq.getBrokerName(), mqs);
+            }
+
+            mqs.add(mq);
+        }
+
+        return result;
+    }
+
+    public boolean lock(final MessageQueue mq) {
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+        if (findBrokerResult != null) {
+            LockBatchRequestBody requestBody = new LockBatchRequestBody();
+            requestBody.setConsumerGroup(this.consumerGroup);
+            requestBody.setClientId(this.mQClientFactory.getClientId());
+            requestBody.getMqSet().add(mq);
+
+            try {
+                Set<MessageQueue> lockedMq =
+                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+                for (MessageQueue mmqq : lockedMq) {
+                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
+                    if (processQueue != null) {
+                        processQueue.setLocked(true);
+                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
+                    }
+                }
+
+                boolean lockOK = lockedMq.contains(mq);
+                log.info("the message queue lock {}, {} {}",
+                        lockOK ? "OK" : "Failed",
+                        this.consumerGroup,
+                        mq);
+                return lockOK;
+            } catch (Exception e) {
+                log.error("lockBatchMQ exception, " + mq, e);
+            }
+        }
+
+        return false;
+    }
+
+    public void lockAll() {
+        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
+
+        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Set<MessageQueue>> entry = it.next();
+            final String brokerName = entry.getKey();
+            final Set<MessageQueue> mqs = entry.getValue();
+
+            if (mqs.isEmpty())
+                continue;
+
+            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+            if (findBrokerResult != null) {
+                LockBatchRequestBody requestBody = new LockBatchRequestBody();
+                requestBody.setConsumerGroup(this.consumerGroup);
+                requestBody.setClientId(this.mQClientFactory.getClientId());
+                requestBody.setMqSet(mqs);
+
+                try {
+                    Set<MessageQueue> lockOKMQSet =
+                            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+
+                    for (MessageQueue mq : lockOKMQSet) {
+                        ProcessQueue processQueue = this.processQueueTable.get(mq);
+                        if (processQueue != null) {
+                            if (!processQueue.isLocked()) {
+                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
+                            }
+
+                            processQueue.setLocked(true);
+                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
+                        }
+                    }
+                    for (MessageQueue mq : mqs) {
+                        if (!lockOKMQSet.contains(mq)) {
+                            ProcessQueue processQueue = this.processQueueTable.get(mq);
+                            if (processQueue != null) {
+                                processQueue.setLocked(false);
+                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("lockBatchMQ exception, " + mqs, e);
+                }
+            }
+        }
+    }
+
+    public void doRebalance(final boolean isOrder) {
+        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+        if (subTable != null) {
+            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+                final String topic = entry.getKey();
+                try {
+                    this.rebalanceByTopic(topic, isOrder);
+                } catch (Throwable e) {
+                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        log.warn("rebalanceByTopic Exception", e);
+                    }
+                }
+            }
+        }
+
+        this.truncateMessageQueueNotMyTopic();
+    }
+
+    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+        return subscriptionInner;
+    }
+
+    private void rebalanceByTopic(final String topic, final boolean isOrder) {
+        switch (messageModel) {
+            case BROADCASTING: {
+                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
+                if (mqSet != null) {
+                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
+                    if (changed) {
+                        this.messageQueueChanged(topic, mqSet, mqSet);
+                        log.info("messageQueueChanged {} {} {} {}", //
+                                consumerGroup, //
+                                topic, //
+                                mqSet, //
+                                mqSet);
+                    }
+                } else {
+                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
+                }
+                break;
+            }
+            case CLUSTERING: {
+                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
+                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
+                if (null == mqSet) {
+                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
+                    }
+                }
+
+                if (null == cidAll) {
+                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
+                }
+
+                if (mqSet != null && cidAll != null) {
+                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+                    mqAll.addAll(mqSet);
+
+                    Collections.sort(mqAll);
+                    Collections.sort(cidAll);
+
+                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
+
+                    List<MessageQueue> allocateResult = null;
+                    try {
+                        allocateResult = strategy.allocate(//
+                                this.consumerGroup, //
+                                this.mQClientFactory.getClientId(), //
+                                mqAll, //
+                                cidAll);
+                    } catch (Throwable e) {
+                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
+                                e);
+                        return;
+                    }
+
+                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+                    if (allocateResult != null) {
+                        allocateResultSet.addAll(allocateResult);
+                    }
+
+                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
+                    if (changed) {
+                        log.info(
+                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
+                                allocateResultSet.size(), allocateResultSet);
+                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
+                    }
+                }
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
+    private void truncateMessageQueueNotMyTopic() {
+        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+
+        for (MessageQueue mq : this.processQueueTable.keySet()) {
+            if (!subTable.containsKey(mq.getTopic())) {
+
+                ProcessQueue pq = this.processQueueTable.remove(mq);
+                if (pq != null) {
+                    pq.setDropped(true);
+                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
+                }
+            }
+        }
+    }
+
+    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
+        boolean changed = false;
+
+        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, ProcessQueue> next = it.next();
+            MessageQueue mq = next.getKey();
+            ProcessQueue pq = next.getValue();
+
+            if (mq.getTopic().equals(topic)) {
+                if (!mqSet.contains(mq)) {
+                    pq.setDropped(true);
+                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                        it.remove();
+                        changed = true;
+                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+                    }
+                } else if (pq.isPullExpired()) {
+                    switch (this.consumeType()) {
+                        case CONSUME_ACTIVELY:
+                            break;
+                        case CONSUME_PASSIVELY:
+                            pq.setDropped(true);
+                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+                                it.remove();
+                                changed = true;
+                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+                                        consumerGroup, mq);
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            }
+        }
+
+        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+        for (MessageQueue mq : mqSet) {
+            if (!this.processQueueTable.containsKey(mq)) {
+                if (isOrder && !this.lock(mq)) {
+                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+                    continue;
+                }
+
+                this.removeDirtyOffset(mq);
+                ProcessQueue pq = new ProcessQueue();
+                long nextOffset = this.computePullFromWhere(mq);
+                if (nextOffset >= 0) {
+                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
+                    if (pre != null) {
+                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
+                    } else {
+                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
+                        PullRequest pullRequest = new PullRequest();
+                        pullRequest.setConsumerGroup(consumerGroup);
+                        pullRequest.setNextOffset(nextOffset);
+                        pullRequest.setMessageQueue(mq);
+                        pullRequest.setProcessQueue(pq);
+                        pullRequestList.add(pullRequest);
+                        changed = true;
+                    }
+                } else {
+                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
+                }
+            }
+        }
+
+        this.dispatchPullRequest(pullRequestList);
+
+        return changed;
+    }
+
+    public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided);
+
+    public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
+
+    public abstract ConsumeType consumeType();
+
+    public abstract void removeDirtyOffset(final MessageQueue mq);
+
+    public abstract long computePullFromWhere(final MessageQueue mq);
+
+    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
+
+    public void removeProcessQueue(final MessageQueue mq) {
+        ProcessQueue prev = this.processQueueTable.remove(mq);
+        if (prev != null) {
+            boolean droped = prev.isDropped();
+            prev.setDropped(true);
+            this.removeUnnecessaryMessageQueue(mq, prev);
+            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped);
+        }
+    }
+
+    public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
+        return processQueueTable;
+    }
+
+
+    public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
+        return topicSubscribeInfoTable;
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+        return allocateMessageQueueStrategy;
+    }
+
+
+    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+    }
+
+
+    public MQClientInstance getmQClientFactory() {
+        return mQClientFactory;
+    }
+
+
+    public void setmQClientFactory(MQClientInstance mQClientFactory) {
+        this.mQClientFactory = mQClientFactory;
+    }
+
+
+    public void destroy() {
+        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, ProcessQueue> next = it.next();
+            next.getValue().setDropped(true);
+        }
+
+        this.processQueueTable.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
new file mode 100644
index 0000000..8d2b465
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalancePullImpl extends RebalanceImpl {
+    private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
+
+
+    public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+        this(null, null, null, null, defaultMQPullConsumerImpl);
+    }
+
+
+    public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+                             MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
+        this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
+    }
+
+    @Override
+    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+        MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
+        if (messageQueueListener != null) {
+            try {
+                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+            } catch (Throwable e) {
+                log.error("messageQueueChanged exception", e);
+            }
+        }
+    }
+
+    @Override
+    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
+        this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
+        this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
+        return true;
+    }
+
+    @Override
+    public ConsumeType consumeType() {
+        return ConsumeType.CONSUME_ACTIVELY;
+    }
+
+    @Override
+    public void removeDirtyOffset(final MessageQueue mq) {
+        this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
+    }
+
+    @Override
+    public long computePullFromWhere(MessageQueue mq) {
+        return 0;
+    }
+
+    @Override
+    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
new file mode 100644
index 0000000..2377d29
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
+import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalancePushImpl extends RebalanceImpl {
+    private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
+    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+
+
+    public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+        this(null, null, null, null, defaultMQPushConsumerImpl);
+    }
+
+
+    public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+                             MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
+        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+    }
+
+    @Override
+    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+    }
+
+    @Override
+    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
+        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
+                && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+            try {
+                if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
+                    try {
+                        return this.unlockDelay(mq, pq);
+                    } finally {
+                        pq.getLockConsume().unlock();
+                    }
+                } else {
+                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
+                            mq, //
+                            pq.getTryUnlockTimes());
+
+                    pq.incTryUnlockTimes();
+                }
+            } catch (Exception e) {
+                log.error("removeUnnecessaryMessageQueue Exception", e);
+            }
+
+            return false;
+        }
+        return true;
+    }
+
+    private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
+
+        if (pq.hasTempMessage()) {
+            log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
+            this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
+                @Override
+                public void run() {
+                    log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
+                    RebalancePushImpl.this.unlock(mq, true);
+                }
+            }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
+        } else {
+            this.unlock(mq, true);
+        }
+        return true;
+    }
+
+    @Override
+    public ConsumeType consumeType() {
+        return ConsumeType.CONSUME_PASSIVELY;
+    }
+
+    @Override
+    public void removeDirtyOffset(final MessageQueue mq) {
+        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+    }
+
+    @Override
+    public long computePullFromWhere(MessageQueue mq) {
+        long result = -1;
+        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
+        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
+        switch (consumeFromWhere) {
+            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
+            case CONSUME_FROM_MIN_OFFSET:
+            case CONSUME_FROM_MAX_OFFSET:
+            case CONSUME_FROM_LAST_OFFSET: {
+                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                }
+                // First start,no offset
+                else if (-1 == lastOffset) {
+                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        result = 0L;
+                    } else {
+                        try {
+                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    }
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+            case CONSUME_FROM_FIRST_OFFSET: {
+                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                } else if (-1 == lastOffset) {
+                    result = 0L;
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+            case CONSUME_FROM_TIMESTAMP: {
+                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                } else if (-1 == lastOffset) {
+                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        try {
+                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    } else {
+                        try {
+                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
+                                    UtilAll.YYYY_MMDD_HHMMSS).getTime();
+                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    }
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+
+            default:
+                break;
+        }
+
+        return result;
+    }
+
+    @Override
+    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+        for (PullRequest pullRequest : pullRequestList) {
+            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
+            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
new file mode 100644
index 0000000..47a9da5
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.consumer;
+
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.ServiceThread;
+import org.slf4j.Logger;
+
+
+/**
+ * Rebalance Service
+ *
+ * @author shijia.wxr
+ */
+public class RebalanceService extends ServiceThread {
+    private static long waitInterval =
+            Long.parseLong(System.getProperty(
+                    "rocketmq.client.rebalance.waitInterval", "20000"));
+    private final Logger log = ClientLogger.getLog();
+    private final MQClientInstance mqClientFactory;
+
+    public RebalanceService(MQClientInstance mqClientFactory) {
+        this.mqClientFactory = mqClientFactory;
+    }
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped()) {
+            this.waitForRunning(waitInterval);
+            this.mqClientFactory.doRebalance();
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+
+    @Override
+    public String getServiceName() {
+        return RebalanceService.class.getSimpleName();
+    }
+}