You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:29 UTC

[33/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
deleted file mode 100644
index efc5ab0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequest {
-    private String consumerGroup;
-    private MessageQueue messageQueue;
-    private ProcessQueue processQueue;
-    private long nextOffset;
-    private boolean lockedFirst = false;
-
-    public boolean isLockedFirst() {
-        return lockedFirst;
-    }
-
-    public void setLockedFirst(boolean lockedFirst) {
-        this.lockedFirst = lockedFirst;
-    }
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public MessageQueue getMessageQueue() {
-        return messageQueue;
-    }
-
-
-    public void setMessageQueue(MessageQueue messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-
-    public long getNextOffset() {
-        return nextOffset;
-    }
-
-
-    public void setNextOffset(long nextOffset) {
-        this.nextOffset = nextOffset;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
-        result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        PullRequest other = (PullRequest) obj;
-        if (consumerGroup == null) {
-            if (other.consumerGroup != null)
-                return false;
-        } else if (!consumerGroup.equals(other.consumerGroup))
-            return false;
-        if (messageQueue == null) {
-            if (other.messageQueue != null)
-                return false;
-        } else if (!messageQueue.equals(other.messageQueue))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
-                + ", nextOffset=" + nextOffset + "]";
-    }
-
-    public ProcessQueue getProcessQueue() {
-        return processQueue;
-    }
-
-
-    public void setProcessQueue(ProcessQueue processQueue) {
-        this.processQueue = processQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
deleted file mode 100644
index e140b6a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullResultExt extends PullResult {
-    private final long suggestWhichBrokerId;
-    private byte[] messageBinary;
-
-
-    public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
-                         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
-        super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
-        this.suggestWhichBrokerId = suggestWhichBrokerId;
-        this.messageBinary = messageBinary;
-    }
-
-
-    public byte[] getMessageBinary() {
-        return messageBinary;
-    }
-
-
-    public void setMessageBinary(byte[] messageBinary) {
-        this.messageBinary = messageBinary;
-    }
-
-
-    public long getSuggestWhichBrokerId() {
-        return suggestWhichBrokerId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
deleted file mode 100644
index 641bb75..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
-import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Base class for rebalance algorithm
- *
- * @author shijia.wxr
- */
-public abstract class RebalanceImpl {
-    protected static final Logger log = ClientLogger.getLog();
-    protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
-    protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
-            new ConcurrentHashMap<String, Set<MessageQueue>>();
-    protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
-            new ConcurrentHashMap<String, SubscriptionData>();
-    protected String consumerGroup;
-    protected MessageModel messageModel;
-    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
-    protected MQClientInstance mQClientFactory;
-
-
-    public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                         MQClientInstance mQClientFactory) {
-        this.consumerGroup = consumerGroup;
-        this.messageModel = messageModel;
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        this.mQClientFactory = mQClientFactory;
-    }
-
-    public void unlock(final MessageQueue mq, final boolean oneway) {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
-        if (findBrokerResult != null) {
-            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
-            requestBody.setConsumerGroup(this.consumerGroup);
-            requestBody.setClientId(this.mQClientFactory.getClientId());
-            requestBody.getMqSet().add(mq);
-
-            try {
-                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
-                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
-                        this.consumerGroup, //
-                        this.mQClientFactory.getClientId(), //
-                        mq);
-            } catch (Exception e) {
-                log.error("unlockBatchMQ exception, " + mq, e);
-            }
-        }
-    }
-
-    public void unlockAll(final boolean oneway) {
-        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
-
-        for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
-            final String brokerName = entry.getKey();
-            final Set<MessageQueue> mqs = entry.getValue();
-
-            if (mqs.isEmpty())
-                continue;
-
-            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
-            if (findBrokerResult != null) {
-                UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
-                requestBody.setConsumerGroup(this.consumerGroup);
-                requestBody.setClientId(this.mQClientFactory.getClientId());
-                requestBody.setMqSet(mqs);
-
-                try {
-                    this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
-
-                    for (MessageQueue mq : mqs) {
-                        ProcessQueue processQueue = this.processQueueTable.get(mq);
-                        if (processQueue != null) {
-                            processQueue.setLocked(false);
-                            log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq);
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("unlockBatchMQ exception, " + mqs, e);
-                }
-            }
-        }
-    }
-
-    private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
-        HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
-        for (MessageQueue mq : this.processQueueTable.keySet()) {
-            Set<MessageQueue> mqs = result.get(mq.getBrokerName());
-            if (null == mqs) {
-                mqs = new HashSet<MessageQueue>();
-                result.put(mq.getBrokerName(), mqs);
-            }
-
-            mqs.add(mq);
-        }
-
-        return result;
-    }
-
-    public boolean lock(final MessageQueue mq) {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
-        if (findBrokerResult != null) {
-            LockBatchRequestBody requestBody = new LockBatchRequestBody();
-            requestBody.setConsumerGroup(this.consumerGroup);
-            requestBody.setClientId(this.mQClientFactory.getClientId());
-            requestBody.getMqSet().add(mq);
-
-            try {
-                Set<MessageQueue> lockedMq =
-                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
-                for (MessageQueue mmqq : lockedMq) {
-                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
-                    if (processQueue != null) {
-                        processQueue.setLocked(true);
-                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
-                    }
-                }
-
-                boolean lockOK = lockedMq.contains(mq);
-                log.info("the message queue lock {}, {} {}",
-                        lockOK ? "OK" : "Failed",
-                        this.consumerGroup,
-                        mq);
-                return lockOK;
-            } catch (Exception e) {
-                log.error("lockBatchMQ exception, " + mq, e);
-            }
-        }
-
-        return false;
-    }
-
-    public void lockAll() {
-        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
-
-        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, Set<MessageQueue>> entry = it.next();
-            final String brokerName = entry.getKey();
-            final Set<MessageQueue> mqs = entry.getValue();
-
-            if (mqs.isEmpty())
-                continue;
-
-            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
-            if (findBrokerResult != null) {
-                LockBatchRequestBody requestBody = new LockBatchRequestBody();
-                requestBody.setConsumerGroup(this.consumerGroup);
-                requestBody.setClientId(this.mQClientFactory.getClientId());
-                requestBody.setMqSet(mqs);
-
-                try {
-                    Set<MessageQueue> lockOKMQSet =
-                            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
-
-                    for (MessageQueue mq : lockOKMQSet) {
-                        ProcessQueue processQueue = this.processQueueTable.get(mq);
-                        if (processQueue != null) {
-                            if (!processQueue.isLocked()) {
-                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
-                            }
-
-                            processQueue.setLocked(true);
-                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
-                        }
-                    }
-                    for (MessageQueue mq : mqs) {
-                        if (!lockOKMQSet.contains(mq)) {
-                            ProcessQueue processQueue = this.processQueueTable.get(mq);
-                            if (processQueue != null) {
-                                processQueue.setLocked(false);
-                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("lockBatchMQ exception, " + mqs, e);
-                }
-            }
-        }
-    }
-
-    public void doRebalance(final boolean isOrder) {
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-        if (subTable != null) {
-            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
-                final String topic = entry.getKey();
-                try {
-                    this.rebalanceByTopic(topic, isOrder);
-                } catch (Throwable e) {
-                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                        log.warn("rebalanceByTopic Exception", e);
-                    }
-                }
-            }
-        }
-
-        this.truncateMessageQueueNotMyTopic();
-    }
-
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
-        return subscriptionInner;
-    }
-
-    private void rebalanceByTopic(final String topic, final boolean isOrder) {
-        switch (messageModel) {
-            case BROADCASTING: {
-                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
-                if (mqSet != null) {
-                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
-                    if (changed) {
-                        this.messageQueueChanged(topic, mqSet, mqSet);
-                        log.info("messageQueueChanged {} {} {} {}", //
-                                consumerGroup, //
-                                topic, //
-                                mqSet, //
-                                mqSet);
-                    }
-                } else {
-                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
-                }
-                break;
-            }
-            case CLUSTERING: {
-                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
-                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
-                if (null == mqSet) {
-                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
-                    }
-                }
-
-                if (null == cidAll) {
-                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
-                }
-
-                if (mqSet != null && cidAll != null) {
-                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
-                    mqAll.addAll(mqSet);
-
-                    Collections.sort(mqAll);
-                    Collections.sort(cidAll);
-
-                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
-
-                    List<MessageQueue> allocateResult = null;
-                    try {
-                        allocateResult = strategy.allocate(//
-                                this.consumerGroup, //
-                                this.mQClientFactory.getClientId(), //
-                                mqAll, //
-                                cidAll);
-                    } catch (Throwable e) {
-                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
-                                e);
-                        return;
-                    }
-
-                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
-                    if (allocateResult != null) {
-                        allocateResultSet.addAll(allocateResult);
-                    }
-
-                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
-                    if (changed) {
-                        log.info(
-                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
-                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
-                                allocateResultSet.size(), allocateResultSet);
-                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
-                    }
-                }
-                break;
-            }
-            default:
-                break;
-        }
-    }
-
-    private void truncateMessageQueueNotMyTopic() {
-        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-
-        for (MessageQueue mq : this.processQueueTable.keySet()) {
-            if (!subTable.containsKey(mq.getTopic())) {
-
-                ProcessQueue pq = this.processQueueTable.remove(mq);
-                if (pq != null) {
-                    pq.setDropped(true);
-                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
-                }
-            }
-        }
-    }
-
-    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
-        boolean changed = false;
-
-        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<MessageQueue, ProcessQueue> next = it.next();
-            MessageQueue mq = next.getKey();
-            ProcessQueue pq = next.getValue();
-
-            if (mq.getTopic().equals(topic)) {
-                if (!mqSet.contains(mq)) {
-                    pq.setDropped(true);
-                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
-                        it.remove();
-                        changed = true;
-                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
-                    }
-                } else if (pq.isPullExpired()) {
-                    switch (this.consumeType()) {
-                        case CONSUME_ACTIVELY:
-                            break;
-                        case CONSUME_PASSIVELY:
-                            pq.setDropped(true);
-                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
-                                it.remove();
-                                changed = true;
-                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
-                                        consumerGroup, mq);
-                            }
-                            break;
-                        default:
-                            break;
-                    }
-                }
-            }
-        }
-
-        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
-        for (MessageQueue mq : mqSet) {
-            if (!this.processQueueTable.containsKey(mq)) {
-                if (isOrder && !this.lock(mq)) {
-                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
-                    continue;
-                }
-
-                this.removeDirtyOffset(mq);
-                ProcessQueue pq = new ProcessQueue();
-                long nextOffset = this.computePullFromWhere(mq);
-                if (nextOffset >= 0) {
-                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
-                    if (pre != null) {
-                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
-                    } else {
-                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
-                        PullRequest pullRequest = new PullRequest();
-                        pullRequest.setConsumerGroup(consumerGroup);
-                        pullRequest.setNextOffset(nextOffset);
-                        pullRequest.setMessageQueue(mq);
-                        pullRequest.setProcessQueue(pq);
-                        pullRequestList.add(pullRequest);
-                        changed = true;
-                    }
-                } else {
-                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
-                }
-            }
-        }
-
-        this.dispatchPullRequest(pullRequestList);
-
-        return changed;
-    }
-
-    public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided);
-
-    public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
-
-    public abstract ConsumeType consumeType();
-
-    public abstract void removeDirtyOffset(final MessageQueue mq);
-
-    public abstract long computePullFromWhere(final MessageQueue mq);
-
-    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
-
-    public void removeProcessQueue(final MessageQueue mq) {
-        ProcessQueue prev = this.processQueueTable.remove(mq);
-        if (prev != null) {
-            boolean droped = prev.isDropped();
-            prev.setDropped(true);
-            this.removeUnnecessaryMessageQueue(mq, prev);
-            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped);
-        }
-    }
-
-    public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
-        return processQueueTable;
-    }
-
-
-    public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
-        return topicSubscribeInfoTable;
-    }
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public MessageModel getMessageModel() {
-        return messageModel;
-    }
-
-
-    public void setMessageModel(MessageModel messageModel) {
-        this.messageModel = messageModel;
-    }
-
-
-    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
-        return allocateMessageQueueStrategy;
-    }
-
-
-    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-    }
-
-
-    public MQClientInstance getmQClientFactory() {
-        return mQClientFactory;
-    }
-
-
-    public void setmQClientFactory(MQClientInstance mQClientFactory) {
-        this.mQClientFactory = mQClientFactory;
-    }
-
-
-    public void destroy() {
-        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<MessageQueue, ProcessQueue> next = it.next();
-            next.getValue().setDropped(true);
-        }
-
-        this.processQueueTable.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
deleted file mode 100644
index 8d2b465..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalancePullImpl extends RebalanceImpl {
-    private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
-
-
-    public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
-        this(null, null, null, null, defaultMQPullConsumerImpl);
-    }
-
-
-    public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                             MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
-        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
-        this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
-    }
-
-    @Override
-    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
-        MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
-        if (messageQueueListener != null) {
-            try {
-                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
-            } catch (Throwable e) {
-                log.error("messageQueueChanged exception", e);
-            }
-        }
-    }
-
-    @Override
-    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
-        this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
-        this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
-        return true;
-    }
-
-    @Override
-    public ConsumeType consumeType() {
-        return ConsumeType.CONSUME_ACTIVELY;
-    }
-
-    @Override
-    public void removeDirtyOffset(final MessageQueue mq) {
-        this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
-    }
-
-    @Override
-    public long computePullFromWhere(MessageQueue mq) {
-        return 0;
-    }
-
-    @Override
-    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
deleted file mode 100644
index 2377d29..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalancePushImpl extends RebalanceImpl {
-    private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
-    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
-
-
-    public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
-        this(null, null, null, null, defaultMQPushConsumerImpl);
-    }
-
-
-    public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                             MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
-        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
-        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
-    }
-
-    @Override
-    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
-    }
-
-    @Override
-    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
-        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
-        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
-        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
-                && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
-            try {
-                if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
-                    try {
-                        return this.unlockDelay(mq, pq);
-                    } finally {
-                        pq.getLockConsume().unlock();
-                    }
-                } else {
-                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
-                            mq, //
-                            pq.getTryUnlockTimes());
-
-                    pq.incTryUnlockTimes();
-                }
-            } catch (Exception e) {
-                log.error("removeUnnecessaryMessageQueue Exception", e);
-            }
-
-            return false;
-        }
-        return true;
-    }
-
-    private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
-
-        if (pq.hasTempMessage()) {
-            log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
-            this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
-                @Override
-                public void run() {
-                    log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
-                    RebalancePushImpl.this.unlock(mq, true);
-                }
-            }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
-        } else {
-            this.unlock(mq, true);
-        }
-        return true;
-    }
-
-    @Override
-    public ConsumeType consumeType() {
-        return ConsumeType.CONSUME_PASSIVELY;
-    }
-
-    @Override
-    public void removeDirtyOffset(final MessageQueue mq) {
-        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
-    }
-
-    @Override
-    public long computePullFromWhere(MessageQueue mq) {
-        long result = -1;
-        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
-        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
-        switch (consumeFromWhere) {
-            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
-            case CONSUME_FROM_MIN_OFFSET:
-            case CONSUME_FROM_MAX_OFFSET:
-            case CONSUME_FROM_LAST_OFFSET: {
-                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
-                if (lastOffset >= 0) {
-                    result = lastOffset;
-                }
-                // First start,no offset
-                else if (-1 == lastOffset) {
-                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                        result = 0L;
-                    } else {
-                        try {
-                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
-                        } catch (MQClientException e) {
-                            result = -1;
-                        }
-                    }
-                } else {
-                    result = -1;
-                }
-                break;
-            }
-            case CONSUME_FROM_FIRST_OFFSET: {
-                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
-                if (lastOffset >= 0) {
-                    result = lastOffset;
-                } else if (-1 == lastOffset) {
-                    result = 0L;
-                } else {
-                    result = -1;
-                }
-                break;
-            }
-            case CONSUME_FROM_TIMESTAMP: {
-                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
-                if (lastOffset >= 0) {
-                    result = lastOffset;
-                } else if (-1 == lastOffset) {
-                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                        try {
-                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
-                        } catch (MQClientException e) {
-                            result = -1;
-                        }
-                    } else {
-                        try {
-                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
-                                    UtilAll.YYYY_MMDD_HHMMSS).getTime();
-                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
-                        } catch (MQClientException e) {
-                            result = -1;
-                        }
-                    }
-                } else {
-                    result = -1;
-                }
-                break;
-            }
-
-            default:
-                break;
-        }
-
-        return result;
-    }
-
-    @Override
-    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
-        for (PullRequest pullRequest : pullRequestList) {
-            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
-            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
deleted file mode 100644
index 47a9da5..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.ServiceThread;
-import org.slf4j.Logger;
-
-
-/**
- * Rebalance Service
- *
- * @author shijia.wxr
- */
-public class RebalanceService extends ServiceThread {
-    private static long waitInterval =
-            Long.parseLong(System.getProperty(
-                    "rocketmq.client.rebalance.waitInterval", "20000"));
-    private final Logger log = ClientLogger.getLog();
-    private final MQClientInstance mqClientFactory;
-
-    public RebalanceService(MQClientInstance mqClientFactory) {
-        this.mqClientFactory = mqClientFactory;
-    }
-
-    @Override
-    public void run() {
-        log.info(this.getServiceName() + " service started");
-
-        while (!this.isStopped()) {
-            this.waitForRunning(waitInterval);
-            this.mqClientFactory.doRebalance();
-        }
-
-        log.info(this.getServiceName() + " service end");
-    }
-
-
-    @Override
-    public String getServiceName() {
-        return RebalanceService.class.getSimpleName();
-    }
-}