You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:44 UTC

[10/17] incubator-rocketmq-externals git commit: Polish cpp module

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.cpp b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
deleted file mode 100755
index b8650c6..0000000
--- a/rocketmq-client4cpp/src/consumer/PullRequest.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullRequest.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-PullRequest::~PullRequest()
-{
-
-}
-
-std::string PullRequest::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-void PullRequest::setConsumerGroup(const std::string& consumerGroup)
-{
-    m_consumerGroup = consumerGroup;
-}
-
-MessageQueue& PullRequest::getMessageQueue()
-{
-    return m_messageQueue;
-}
-
-void PullRequest::setMessageQueue(const MessageQueue& messageQueue)
-{
-    m_messageQueue = messageQueue;
-}
-
-long long PullRequest::getNextOffset()
-{
-    return m_nextOffset;
-}
-
-void PullRequest::setNextOffset(long long nextOffset)
-{
-    m_nextOffset = nextOffset;
-}
-
-int PullRequest::hashCode()
-{
-    /*
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
-    result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
-    return result;
-    */
-    std::stringstream ss;
-    ss  << m_consumerGroup
-        << m_messageQueue.hashCode();
-    return UtilAll::hashCode(ss.str());
-}
-
-std::string PullRequest::toString() const
-{
-    std::stringstream ss;
-    ss << "{consumerGroup=" << m_consumerGroup
-       << ",messageQueue=" << m_messageQueue.toString()
-       << ",nextOffset=" << m_nextOffset << "}";
-    return ss.str();
-}
-
-
-bool PullRequest::operator==(const PullRequest& other)
-{
-    if (m_consumerGroup != other.m_consumerGroup)
-    {
-        return false;
-    }
-
-    if (!(m_messageQueue == other.m_messageQueue))
-    {
-        return false;
-    }
-
-    return true;
-}
-
-ProcessQueue* PullRequest::getProcessQueue()
-{
-    return m_pProcessQueue;
-}
-
-void PullRequest::setProcessQueue(ProcessQueue* pProcessQueue)
-{
-    m_pProcessQueue = pProcessQueue;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.h b/rocketmq-client4cpp/src/consumer/PullRequest.h
deleted file mode 100755
index 3fb8367..0000000
--- a/rocketmq-client4cpp/src/consumer/PullRequest.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLREQUEST_H__
-#define __PULLREQUEST_H__
-
-#include <string>
-#include <sstream>
-
-#include "MessageQueue.h"
-#include "ProcessQueue.h"
-
-namespace rmq
-{
-    class PullRequest
-    {
-    public:
-        virtual ~PullRequest();
-
-        std::string getConsumerGroup();
-        void setConsumerGroup(const std::string& consumerGroup);
-
-        MessageQueue& getMessageQueue();
-        void setMessageQueue(const MessageQueue& messageQueue);
-
-        long long getNextOffset();
-        void setNextOffset(long long nextOffset);
-
-        int hashCode();
-        std::string toString() const;
-
-        bool operator==(const PullRequest& other);
-
-        ProcessQueue* getProcessQueue();
-        void setProcessQueue(ProcessQueue* pProcessQueue);
-
-    private:
-        std::string m_consumerGroup;
-        MessageQueue m_messageQueue;
-
-        ProcessQueue* m_pProcessQueue;
-        long long m_nextOffset;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullResultExt.h b/rocketmq-client4cpp/src/consumer/PullResultExt.h
deleted file mode 100755
index 24235b2..0000000
--- a/rocketmq-client4cpp/src/consumer/PullResultExt.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLRESULTEXT_H__
-#define __PULLRESULTEXT_H__
-
-#include "PullResult.h"
-
-namespace rmq
-{
-
-  struct PullResultExt : public PullResult
-  {
-      PullResultExt(PullStatus pullStatus,
-                    long long nextBeginOffset,
-                    long long minOffset,
-                    long long maxOffset,
-                    std::list<MessageExt*>& msgFoundList,
-                    long suggestWhichBrokerId,
-                    const char* messageBinary,
-                    int messageBinaryLen)
-          : PullResult(pullStatus,
-                       nextBeginOffset,
-                       minOffset,
-                       maxOffset,
-                       msgFoundList),
-          suggestWhichBrokerId(suggestWhichBrokerId),
-          messageBinary(messageBinary),
-          messageBinaryLen(messageBinaryLen)
-      {
-
-      }
-
-      long suggestWhichBrokerId;
-      const char* messageBinary;
-      int messageBinaryLen;
-  };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
deleted file mode 100755
index efdc1cc..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
+++ /dev/null
@@ -1,613 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalanceImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MixAll.h"
-#include "LockBatchBody.h"
-#include "MQClientAPIImpl.h"
-#include "KPRUtil.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
-                             MessageModel messageModel,
-                             AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
-                             MQClientFactory* pMQClientFactory)
-    : m_consumerGroup(consumerGroup),
-      m_messageModel(messageModel),
-      m_pAllocateMessageQueueStrategy(pAllocateMessageQueueStrategy),
-      m_pMQClientFactory(pMQClientFactory)
-{
-
-}
-
-RebalanceImpl::~RebalanceImpl()
-{
-}
-
-void RebalanceImpl::unlock(MessageQueue& mq, bool oneway)
-{
-    FindBrokerResult findBrokerResult =
-        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
-    if (!findBrokerResult.brokerAddr.empty())
-    {
-        UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
-        requestBody->setConsumerGroup(m_consumerGroup);
-        requestBody->setClientId(m_pMQClientFactory->getClientId());
-        requestBody->getMqSet().insert(mq);
-
-        try
-        {
-            m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
-                    requestBody, 1000, oneway);
-        }
-        catch (...)
-        {
-            RMQ_ERROR("unlockBatchMQ exception, MQ: {%s}" , mq.toString().c_str());
-        }
-    }
-}
-
-void RebalanceImpl::unlockAll(bool oneway)
-{
-    std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
-    std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
-
-    for (; it != brokerMqs.end(); it++)
-    {
-        std::string brokerName = it->first;
-        std::set<MessageQueue> mqs = it->second;
-
-        if (mqs.empty())
-        {
-            continue;
-        }
-
-        FindBrokerResult findBrokerResult =
-            m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
-
-        if (!findBrokerResult.brokerAddr.empty())
-        {
-            UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
-            requestBody->setConsumerGroup(m_consumerGroup);
-            requestBody->setClientId(m_pMQClientFactory->getClientId());
-            requestBody->setMqSet(mqs);
-
-            try
-            {
-                m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
-                        requestBody, 1000, oneway);
-
-                kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-                std::set<MessageQueue>::iterator itm = mqs.begin();
-                for (; itm != mqs.end(); itm++)
-                {
-                    std::map<MessageQueue, ProcessQueue*>::iterator itp = m_processQueueTable.find(*itm);
-                    if (itp != m_processQueueTable.end())
-                    {
-                        itp->second->setLocked(false);
-                        RMQ_INFO("the message queue unlock OK, Group: {%s}, MQ: {%s}",
-                                 m_consumerGroup.c_str(), (*itm).toString().c_str());
-                    }
-                }
-            }
-            catch (...)
-            {
-                RMQ_ERROR("unlockBatchMQ exception, mqs.size: {%u} ", (unsigned)mqs.size());
-            }
-        }
-    }
-}
-
-bool RebalanceImpl::lock(MessageQueue& mq)
-{
-    FindBrokerResult findBrokerResult =
-        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
-    if (!findBrokerResult.brokerAddr.empty())
-    {
-        LockBatchRequestBody* requestBody = new LockBatchRequestBody();
-        requestBody->setConsumerGroup(m_consumerGroup);
-        requestBody->setClientId(m_pMQClientFactory->getClientId());
-        requestBody->getMqSet().insert(mq);
-
-        try
-        {
-            std::set<MessageQueue> lockedMq =
-                m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
-                    findBrokerResult.brokerAddr, requestBody, 1000);
-
-            std::set<MessageQueue>::iterator it = lockedMq.begin();
-            for (; it != lockedMq.end(); it++)
-            {
-                kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-                MessageQueue mmqq = *it;
-                std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mmqq);
-                if (itt != m_processQueueTable.end())
-                {
-                    itt->second->setLocked(true);
-                    itt->second->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
-                }
-            }
-
-            it = lockedMq.find(mq);
-            bool lockOK = (it != lockedMq.end());
-
-            RMQ_INFO("the message queue lock {%s}, {%s}, {%s}",//
-                     (lockOK ? "OK" : "Failed"), //
-                     m_consumerGroup.c_str(), //
-                     mq.toString().c_str());
-            return lockOK;
-        }
-        catch (...)
-        {
-            RMQ_ERROR("lockBatchMQ exception, MQ: {%s}", mq.toString().c_str());
-        }
-    }
-
-    return false;
-}
-
-void RebalanceImpl::lockAll()
-{
-    std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
-
-    std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
-    for (; it != brokerMqs.end(); it++)
-    {
-        std::string brokerName = it->first;
-        std::set<MessageQueue> mqs = it->second;
-
-        if (mqs.empty())
-        {
-            continue;
-        }
-
-        FindBrokerResult findBrokerResult =
-            m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
-        if (!findBrokerResult.brokerAddr.empty())
-        {
-            LockBatchRequestBody* requestBody = new LockBatchRequestBody();
-            requestBody->setConsumerGroup(m_consumerGroup);
-            requestBody->setClientId(m_pMQClientFactory->getClientId());
-            requestBody->setMqSet(mqs);
-
-            try
-            {
-                std::set<MessageQueue> lockOKMQSet =
-                    m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
-                        findBrokerResult.brokerAddr, requestBody, 1000);
-
-                std::set<MessageQueue>::iterator its = lockOKMQSet.begin();
-                for (; its != lockOKMQSet.end(); its++)
-                {
-                    kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-                    MessageQueue mq = *its;
-                    std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
-                    if (itt != m_processQueueTable.end())
-                    {
-                        ProcessQueue* processQueue = itt->second;
-                        if (!processQueue->isLocked())
-                        {
-                            RMQ_INFO("the message queue locked OK, Group: {%s}, MQ: %s",
-                            	m_consumerGroup.c_str(),
-                            	mq.toString().c_str());
-                        }
-
-                        processQueue->setLocked(true);
-                        processQueue->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
-                    }
-                }
-
-                its = mqs.begin();
-                for (; its != mqs.end(); its++)
-                {
-                    MessageQueue mq = *its;
-                    std::set<MessageQueue>::iterator itf = lockOKMQSet.find(mq);
-                    if (itf == lockOKMQSet.end())
-                    {
-                        kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-                        std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
-                        if (itt != m_processQueueTable.end())
-                        {
-                            itt->second->setLocked(false);
-                            RMQ_WARN("the message queue locked Failed, Group: {%s}, MQ: %s",
-                            	m_consumerGroup.c_str(),
-                            	mq.toString().c_str());
-                        }
-                    }
-                }
-            }
-            catch (std::exception& e)
-            {
-                RMQ_ERROR("lockBatchMQ exception: %s", e.what());
-            }
-        }
-    }
-}
-
-void RebalanceImpl::doRebalance()
-{
-    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
-    std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
-    for (; it != subTable.end(); it++)
-    {
-        std::string topic = it->first;
-        try
-        {
-            rebalanceByTopic(topic);
-        }
-        catch (std::exception& e)
-        {
-            if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
-            {
-                RMQ_WARN("rebalanceByTopic Exception: %s", e.what());
-            }
-        }
-    }
-
-    truncateMessageQueueNotMyTopic();
-}
-
-std::map<std::string, SubscriptionData>& RebalanceImpl::getSubscriptionInner()
-{
-    return m_subscriptionInner;
-}
-
-std::map<MessageQueue, ProcessQueue*>& RebalanceImpl::getProcessQueueTable()
-{
-    return m_processQueueTable;
-}
-
-
-kpr::RWMutex& RebalanceImpl::getProcessQueueTableLock()
-{
-	return m_processQueueTableLock;
-}
-
-
-std::map<std::string, std::set<MessageQueue> >& RebalanceImpl::getTopicSubscribeInfoTable()
-{
-    return m_topicSubscribeInfoTable;
-}
-
-std::string& RebalanceImpl::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-void RebalanceImpl::setConsumerGroup(const std::string& consumerGroup)
-{
-    m_consumerGroup = consumerGroup;
-}
-
-MessageModel RebalanceImpl::getMessageModel()
-{
-    return m_messageModel;
-}
-
-void RebalanceImpl::setMessageModel(MessageModel messageModel)
-{
-    m_messageModel = messageModel;
-}
-
-AllocateMessageQueueStrategy* RebalanceImpl::getAllocateMessageQueueStrategy()
-{
-    return m_pAllocateMessageQueueStrategy;
-}
-
-void RebalanceImpl::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
-    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-MQClientFactory* RebalanceImpl::getmQClientFactory()
-{
-    return m_pMQClientFactory;
-}
-
-void RebalanceImpl::setmQClientFactory(MQClientFactory* pMQClientFactory)
-{
-    m_pMQClientFactory = pMQClientFactory;
-}
-
-std::map<std::string, std::set<MessageQueue> > RebalanceImpl::buildProcessQueueTableByBrokerName()
-{
-	std::map<std::string, std::set<MessageQueue> > result ;
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-    std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
-    for (; it != m_processQueueTable.end();)
-    {
-        MessageQueue mq = it->first;
-        std::map<std::string, std::set<MessageQueue> >::iterator itm = result.find(mq.getBrokerName());
-        if (itm == result.end())
-        {
-            std::set<MessageQueue> mqs ;
-            mqs.insert(mq);
-            result[mq.getBrokerName()] = mqs;
-        }
-        else
-        {
-            itm->second.insert(mq);
-        }
-    }
-
-    return result;
-}
-
-void RebalanceImpl::rebalanceByTopic(const std::string& topic)
-{
-	RMQ_DEBUG("rebalanceByTopic begin, topic={%s}", topic.c_str());
-    switch (m_messageModel)
-    {
-        case BROADCASTING:
-        {
-            //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
-            std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
-            if (it != m_topicSubscribeInfoTable.end())
-            {
-                std::set<MessageQueue> mqSet = it->second;
-                bool changed = updateProcessQueueTableInRebalance(topic, mqSet);
-                if (changed)
-                {
-                    messageQueueChanged(topic, mqSet, mqSet);
-                    RMQ_INFO("messageQueueChanged {%s} {%s} {%s} {%s}",
-                             m_consumerGroup.c_str(),
-                             topic.c_str(),
-                             UtilAll::toString(mqSet).c_str(),
-                             UtilAll::toString(mqSet).c_str());
-                }
-            }
-            else
-            {
-                RMQ_WARN("doRebalance, {%s}, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
-            }
-            break;
-        }
-        case CLUSTERING:
-        {
-            //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
-            std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
-            if (it == m_topicSubscribeInfoTable.end())
-            {
-                if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
-                {
-                    RMQ_WARN("doRebalance, %s, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
-                }
-            }
-
-            std::list<std::string> cidAll = m_pMQClientFactory->findConsumerIdList(topic, m_consumerGroup);
-            if (cidAll.empty())
-            {
-                RMQ_WARN("doRebalance, %s:%s, get consumer id list failed.", m_consumerGroup.c_str(), topic.c_str());
-            }
-
-            if (it != m_topicSubscribeInfoTable.end() && !cidAll.empty())
-            {
-                std::vector<MessageQueue> mqAll;
-                std::set<MessageQueue> mqSet = it->second;
-                std::set<MessageQueue>::iterator its = mqSet.begin();
-
-                for (; its != mqSet.end(); its++)
-                {
-                    mqAll.push_back(*its);
-                }
-
-                cidAll.sort();
-
-                AllocateMessageQueueStrategy* strategy = m_pAllocateMessageQueueStrategy;
-
-                std::vector<MessageQueue>* allocateResult = NULL;
-                try
-                {
-                    allocateResult = strategy->allocate(m_consumerGroup,
-                    	m_pMQClientFactory->getClientId(), mqAll, cidAll);
-                }
-                catch (std::exception& e)
-                {
-                    RMQ_ERROR("AllocateMessageQueueStrategy.allocate Exception, allocateMessageQueueStrategyName={%s}, mqAll={%s}, cidAll={%s}, %s",
-                    	strategy->getName().c_str(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str(), e.what());
-                    return;
-                }
-
-                std::set<MessageQueue> allocateResultSet;
-                if (allocateResult != NULL)
-                {
-                    for (size_t i = 0; i < allocateResult->size(); i++)
-                    {
-                        allocateResultSet.insert(allocateResult->at(i));
-                    }
-
-                    delete allocateResult;
-                }
-
-                bool changed = updateProcessQueueTableInRebalance(topic, allocateResultSet);
-                if (changed)
-                {
-                    RMQ_INFO("rebalanced result changed. allocateMessageQueueStrategyName={%s}, group={%s}, topic={%s}, ConsumerId={%s}, "
-                             "rebalanceSize={%u}, rebalanceMqSet={%s}, mqAllSize={%u}, cidAllSize={%u}, mqAll={%s}, cidAll={%s}",
-                             strategy->getName().c_str(), m_consumerGroup.c_str(), topic.c_str(), m_pMQClientFactory->getClientId().c_str(),
-                             (unsigned)allocateResultSet.size(), UtilAll::toString(allocateResultSet).c_str(),
-                             (unsigned)mqAll.size(), (unsigned)cidAll.size(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str()
-                            );
-
-                    messageQueueChanged(topic, mqSet, allocateResultSet);
-                }
-            }
-        }
-        break;
-        default:
-            break;
-    }
-    RMQ_DEBUG("rebalanceByTopic end");
-}
-
-
-void RebalanceImpl::removeProcessQueue(const MessageQueue& mq)
-{
-	kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-	std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.find(mq);
-	if (it != m_processQueueTable.end())
-	{
-		MessageQueue mq = it->first;
-        ProcessQueue* pq = it->second;
-        bool isDroped = pq->isDropped();
-
-        this->removeUnnecessaryMessageQueue(mq, *pq);
-        RMQ_INFO("Fix Offset, {%s}, remove unnecessary mq, {%s} Droped: {%d}",
-        	m_consumerGroup.c_str(), mq.toString().c_str(), isDroped);
-	}
-}
-
-
-bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet)
-{
-	RMQ_DEBUG("updateProcessQueueTableInRebalance begin, topic={%s}", topic.c_str());
-    bool changed = false;
-
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
-        std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
-        for (; it != m_processQueueTable.end();)
-        {
-        	std::map<MessageQueue, ProcessQueue*>::iterator itCur = it++;
-            MessageQueue mq = itCur->first;
-            ProcessQueue* pq = itCur->second;
-            if (mq.getTopic() == topic)
-            {
-                std::set<MessageQueue>::iterator itMq = mqSet.find(mq);
-                if (itMq == mqSet.end())
-                {
-                    pq->setDropped(true);
-        			if (this->removeUnnecessaryMessageQueue(mq, *pq))
-        			{
-        				changed = true;
-        				m_processQueueTable.erase(itCur);
-
-        				RMQ_WARN("doRebalance, {%s}, remove unnecessary mq, {%s}",
-        					m_consumerGroup.c_str(), mq.toString().c_str());
-        			}
-                }
-                else if (pq->isPullExpired())
-                {
-                	switch(this->consumeType())
-                	{
-                		case CONSUME_ACTIVELY:
-                            break;
-                        case CONSUME_PASSIVELY:
-                        	pq->setDropped(true);
-        					if (this->removeUnnecessaryMessageQueue(mq, *pq))
-        					{
-        						changed = true;
-                            	m_processQueueTable.erase(itCur);
-
-                            	RMQ_ERROR("[BUG]doRebalance, {%s}, remove unnecessary mq, {%s}, because pull is pause, so try to fixed it",
-                                    m_consumerGroup.c_str(), mq.toString().c_str());
-        					}
-                            break;
-                        default:
-                            break;
-                	}
-                }
-            }
-        }
-    }
-
-    std::list<PullRequest*> pullRequestList;
-    std::set<MessageQueue>::iterator its = mqSet.begin();
-    for (; its != mqSet.end(); its++)
-    {
-        MessageQueue mq = *its;
-        bool find = false;
-        {
-            kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
-            std::map<MessageQueue, ProcessQueue*>::iterator itm = m_processQueueTable.find(mq);
-            if (itm != m_processQueueTable.end())
-            {
-                find = true;
-            }
-        }
-
-        if (!find)
-        {
-        	//todo: memleak
-            PullRequest* pullRequest = new PullRequest();
-            pullRequest->setConsumerGroup(m_consumerGroup);
-            pullRequest->setMessageQueue(mq);
-            pullRequest->setProcessQueue(new ProcessQueue());//todo: memleak
-
-            long long nextOffset = computePullFromWhere(mq);
-            if (nextOffset >= 0)
-            {
-                pullRequest->setNextOffset(nextOffset);
-                pullRequestList.push_back(pullRequest);
-                changed = true;
-
-				{
-	                kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
-	                m_processQueueTable[mq] = pullRequest->getProcessQueue();
-	                RMQ_INFO("doRebalance, {%s}, add a new mq, {%s}, pullRequst: %s",
-	                	m_consumerGroup.c_str(), mq.toString().c_str(), pullRequest->toString().c_str());
-                }
-            }
-            else
-            {
-                RMQ_WARN("doRebalance, {%s}, add new mq failed, {%s}",
-                	m_consumerGroup.c_str(), mq.toString().c_str());
-            }
-        }
-    }
-
-    //todo memleak
-    dispatchPullRequest(pullRequestList);
-    RMQ_DEBUG("updateProcessQueueTableInRebalance end");
-
-    return changed;
-}
-
-void RebalanceImpl::truncateMessageQueueNotMyTopic()
-{
-    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
-
-    kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
-    std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
-    for (; it != m_processQueueTable.end();)
-    {
-        MessageQueue mq = it->first;
-        std::map<std::string, SubscriptionData>::iterator itt = subTable.find(mq.getTopic());
-
-        if (itt == subTable.end())
-        {
-            ProcessQueue* pq = it->second;
-            if (pq != NULL)
-            {
-                pq->setDropped(true);
-                RMQ_WARN("doRebalance, {%s}, truncateMessageQueueNotMyTopic remove unnecessary mq, {%s}",
-                         m_consumerGroup.c_str(), mq.toString().c_str());
-            }
-            m_processQueueTable.erase(it++);
-        }
-        else
-        {
-            it++;
-        }
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
deleted file mode 100755
index 577a031..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCEIMPL_H__
-#define __REBALANCEIMPL_H__
-
-#include <map>
-#include <string>
-#include <set>
-#include <list>
-
-#include "ConsumeType.h"
-#include "MessageQueue.h"
-#include "ProcessQueue.h"
-#include "PullRequest.h"
-#include "SubscriptionData.h"
-
-namespace rmq
-{
-    class AllocateMessageQueueStrategy;
-    class MQClientFactory;
-
-    class RebalanceImpl
-    {
-    public:
-        RebalanceImpl(const std::string& consumerGroup,
-                      MessageModel messageModel,
-                      AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
-                      MQClientFactory* pMQClientFactory);
-        virtual ~RebalanceImpl();
-
-        virtual void messageQueueChanged(const std::string& topic,
-                                         std::set<MessageQueue>& mqAll,
-                                         std::set<MessageQueue>& mqDivided) = 0;
-        virtual bool removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) = 0;
-        virtual void dispatchPullRequest(std::list<PullRequest*>& pullRequestList) = 0;
-        virtual long long computePullFromWhere(MessageQueue& mq) = 0;
-		virtual ConsumeType consumeType() = 0;
-
-        bool lock(MessageQueue& mq);
-        void lockAll();
-
-        void unlock(MessageQueue& mq, bool oneway);
-        void unlockAll(bool oneway);
-
-        void doRebalance();
-
-        std::map<MessageQueue, ProcessQueue*>& getProcessQueueTable();
-		kpr::RWMutex& getProcessQueueTableLock();
-		std::map<std::string, SubscriptionData>& getSubscriptionInner();
-        std::map<std::string, std::set<MessageQueue> >& getTopicSubscribeInfoTable();
-
-        std::string& getConsumerGroup();
-        void setConsumerGroup(const std::string& consumerGroup);
-
-        MessageModel getMessageModel();
-        void setMessageModel(MessageModel messageModel);
-
-        AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
-        void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
-
-        MQClientFactory* getmQClientFactory();
-        void setmQClientFactory(MQClientFactory* pMQClientFactory);
-
-		void removeProcessQueue(const MessageQueue& mq);
-
-    private:
-        std::map<std::string, std::set<MessageQueue> > buildProcessQueueTableByBrokerName();
-        void rebalanceByTopic(const std::string& topic);
-        bool updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet);
-        void truncateMessageQueueNotMyTopic();
-
-    protected:
-        std::map<MessageQueue, ProcessQueue*> m_processQueueTable;
-        kpr::RWMutex m_processQueueTableLock;
-
-        std::map<std::string, std::set<MessageQueue> > m_topicSubscribeInfoTable;
-        kpr::Mutex m_topicSubscribeInfoTableLock;
-
-        std::map<std::string, SubscriptionData> m_subscriptionInner;
-        kpr::Mutex m_subscriptionInnerLock;
-
-        std::string m_consumerGroup;
-        MessageModel m_messageModel;
-        AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy;
-        MQClientFactory* m_pMQClientFactory;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
deleted file mode 100755
index 1aa287b..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalancePullImpl.h"
-#include "DefaultMQPullConsumerImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MessageQueueListener.h"
-#include "OffsetStore.h"
-#include "DefaultMQPullConsumer.h"
-
-namespace rmq
-{
-
-RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
-    : RebalanceImpl("", BROADCASTING, NULL, NULL),
-      m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
-{
-}
-
-RebalancePullImpl::RebalancePullImpl(const std::string& consumerGroup,
-                                     MessageModel messageModel,
-                                     AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
-                                     MQClientFactory* pMQClientFactory,
-                                     DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
-    : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
-      m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
-{
-}
-
-long long RebalancePullImpl::computePullFromWhere(MessageQueue& mq)
-{
-    return 0;
-}
-
-void RebalancePullImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
-{
-}
-
-void RebalancePullImpl::messageQueueChanged(const std::string& topic,
-        std::set<MessageQueue>& mqAll,
-        std::set<MessageQueue>& mqDivided)
-{
-    MessageQueueListener* messageQueueListener =
-        m_pDefaultMQPullConsumerImpl->getDefaultMQPullConsumer()->getMessageQueueListener();
-    if (messageQueueListener != NULL)
-    {
-        try
-        {
-            messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided);
-        }
-        catch (...)
-        {
-            RMQ_ERROR("messageQueueChanged exception, %s", topic.c_str());
-        }
-    }
-}
-
-bool RebalancePullImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
-{
-    m_pDefaultMQPullConsumerImpl->getOffsetStore()->persist(mq);
-    m_pDefaultMQPullConsumerImpl->getOffsetStore()->removeOffset(mq);
-    return true;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
deleted file mode 100755
index 46dbcd1..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCEPULLIMPL_H__
-#define __REBALANCEPULLIMPL_H__
-
-#include "RebalanceImpl.h"
-
-namespace rmq
-{
-class DefaultMQPullConsumerImpl;
-
-class RebalancePullImpl : public RebalanceImpl
-{
-public:
-    RebalancePullImpl(DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
-
-    RebalancePullImpl(const std::string &consumerGroup,
-                      MessageModel messageModel,
-                      AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
-                      MQClientFactory *pMQClientFactory,
-                      DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
-
-    long long computePullFromWhere(MessageQueue &mq);
-
-    void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
-
-    void messageQueueChanged(const std::string &topic,
-                             std::set<MessageQueue> &mqAll,
-                             std::set<MessageQueue> &mqDivided);
-
-    bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
-
-    ConsumeType consumeType()
-    {
-        return CONSUME_ACTIVELY;
-    };
-
-private:
-    DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
deleted file mode 100755
index fde770d..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalancePushImpl.h"
-
-#include <string.h>
-#include <limits.h>
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MessageQueueListener.h"
-#include "OffsetStore.h"
-#include "DefaultMQPushConsumer.h"
-#include "MQAdminImpl.h"
-
-
-namespace rmq
-{
-
-RebalancePushImpl::RebalancePushImpl(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
-    : RebalanceImpl("", BROADCASTING, NULL, NULL),
-      m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
-{
-}
-
-RebalancePushImpl::RebalancePushImpl(const std::string& consumerGroup,
-                                     MessageModel messageModel,
-                                     AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
-                                     MQClientFactory* pMQClientFactory,
-                                     DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
-    : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
-      m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
-{
-}
-
-void RebalancePushImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
-{
-    std::list<PullRequest*>::iterator it = pullRequestList.begin();
-    for (; it != pullRequestList.end(); it++)
-    {
-        m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(*it);
-        RMQ_INFO("doRebalance, {%s}, add a new pull request {%s}",
-        	m_consumerGroup.c_str(), (*it)->toString().c_str());
-    }
-}
-
-long long RebalancePushImpl::computePullFromWhere(MessageQueue& mq)
-{
-    long long result = -1;
-    ConsumeFromWhere consumeFromWhere =
-        m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeFromWhere();
-    OffsetStore* offsetStore = m_pDefaultMQPushConsumerImpl->getOffsetStore();
-
-    switch (consumeFromWhere)
-    {
-    	case CONSUME_FROM_FIRST_OFFSET:
-        {
-            long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
-            if (lastOffset >= 0)
-            {
-                result = lastOffset;
-            }
-            else if (-1 == lastOffset)
-            {
-                result = 0L;
-            }
-            else
-            {
-                result = -1;
-            }
-            break;
-        }
-        case CONSUME_FROM_LAST_OFFSET:
-        {
-            long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
-            if (lastOffset >= 0)
-            {
-                result = lastOffset;
-            }
-            else if (-1 == lastOffset)
-            {
-                if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
-                {
-                    result = 0L;
-                }
-                else
-                {
-                    //result = LLONG_MAX;
-                    try
-                    {
-                    	result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-                    }
-                    catch(...)
-                    {
-                    	result = -1;
-                    }
-                }
-            }
-            else
-            {
-                result = -1;
-            }
-            break;
-        }
-
-        case CONSUME_FROM_MAX_OFFSET:
-            result = LLONG_MAX;
-            break;
-        case CONSUME_FROM_MIN_OFFSET:
-            result = 0L;
-            break;
-        case CONSUME_FROM_TIMESTAMP:
-        	{
-	        	long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
-	            if (lastOffset >= 0)
-	            {
-	                result = lastOffset;
-	            }
-	            else if (-1 == lastOffset)
-	            {
-	                if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
-	                {
-	                	//result = LLONG_MAX;
-	                    try
-	                    {
-	                    	result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-	                    }
-	                    catch(...)
-	                    {
-	                    	result = -1;
-	                    }
-	                }
-	                else
-	                {
-	                    try
-	                    {
-	                    	long timestamp = UtilAll::str2tm(
-                                    m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeTimestamp(),
-                                    rmq::yyyyMMddHHmmss);
-                        	result = m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
-	                    }
-	                    catch(...)
-	                    {
-	                    	result = -1;
-	                    }
-	                }
-	            }
-	            else
-	            {
-	                result = -1;
-	            }
-	            break;
-	        }
-        	break;
-        default:
-            break;
-    }
-
-    return result;
-}
-
-void RebalancePushImpl::messageQueueChanged(const std::string& topic,
-        std::set<MessageQueue>& mqAll,
-        std::set<MessageQueue>& mqDivided)
-{
-}
-
-
-bool RebalancePushImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
-{
-    m_pDefaultMQPushConsumerImpl->getOffsetStore()->persist(mq);
-    m_pDefaultMQPushConsumerImpl->getOffsetStore()->removeOffset(mq);
-    if (m_pDefaultMQPushConsumerImpl->isConsumeOrderly()
-    	&& m_pDefaultMQPushConsumerImpl->messageModel() == CLUSTERING)
-    {
-    	if (pq.getLockConsume().TryLock(1000))
-    	{
-	    	try
-	    	{
-	        	this->unlock(mq, true);
-	        }
-	        catch (std::exception& e)
-	        {
-	            RMQ_ERROR("removeUnnecessaryMessageQueue Exception: %s", e.what());
-	        }
-	        pq.getLockConsume().Unlock();
-	   	}
-	   	else
-	   	{
-	   		RMQ_WARN("[WRONG]mq is consuming, so can not unlock it, MQ:%s, maybe hanged for a while, times:{%lld}",
-            	mq.toString().c_str(),
-            	pq.getTryUnlockTimes());
-
-			pq.incTryUnlockTimes();
-	   	}
-
-	   	return false;
-    }
-
-    return true;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
deleted file mode 100755
index 0aa2b0e..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __REBALANCEPUSHIMPL_H__
-#define __REBALANCEPUSHIMPL_H__
-
-#include "RebalanceImpl.h"
-
-namespace rmq
-{
-class DefaultMQPushConsumerImpl;
-
-class RebalancePushImpl : public RebalanceImpl
-{
-public:
-    RebalancePushImpl(DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
-
-    RebalancePushImpl(const std::string &consumerGroup,
-                      MessageModel messageModel,
-                      AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
-                      MQClientFactory *pMQClientFactory,
-                      DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
-
-    void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
-    long long computePullFromWhere(MessageQueue &mq);
-    void messageQueueChanged(const std::string &topic,
-                             std::set<MessageQueue> &mqAll,
-                             std::set<MessageQueue> &mqDivided);
-    bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
-
-
-    ConsumeType consumeType()
-    {
-        return CONSUME_PASSIVELY;
-    };
-
-private:
-    DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
deleted file mode 100644
index 013fefb..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalanceService.h"
-#include "MQClientFactory.h"
-
-namespace rmq
-{
-
-long RebalanceService::s_WaitInterval = 1000 * 10;
-
-RebalanceService::RebalanceService(MQClientFactory* pMQClientFactory)
-    : ServiceThread("RebalanceService"),
-      m_pMQClientFactory(pMQClientFactory)
-{
-}
-
-
-RebalanceService::~RebalanceService()
-{
-
-}
-
-void RebalanceService::Run()
-{
-	RMQ_INFO("%s service started", getServiceName().c_str());
-
-    while (!m_stoped)
-    {
-        waitForRunning(s_WaitInterval);
-        m_pMQClientFactory->doRebalance();
-    }
-
-    RMQ_INFO("%s service end", getServiceName().c_str());
-}
-
-std::string RebalanceService::getServiceName()
-{
-    return "RebalanceService";
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.h b/rocketmq-client4cpp/src/consumer/RebalanceService.h
deleted file mode 100755
index ef4d746..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceService.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCESERVICE_H__
-#define __REBALANCESERVICE_H__
-
-#include "ServiceThread.h"
-
-namespace rmq
-{
-	class MQClientFactory;
-
-	/**
-	* Rebalance service
-	*
-	*/
-	class RebalanceService : public ServiceThread
-	{
-	public:
-	    RebalanceService(MQClientFactory* pMQClientFactory);
-	    ~RebalanceService();
-
-	    void Run();
-	    std::string getServiceName();
-
-	private:
-	    MQClientFactory* m_pMQClientFactory;
-	    static long s_WaitInterval;
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
deleted file mode 100755
index 1c4fd23..0000000
--- a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RemoteBrokerOffsetStore.h"
-#include "MQClientFactory.h"
-#include "ScopedLock.h"
-#include "MQClientException.h"
-#include "CommandCustomHeader.h"
-#include "MQClientAPIImpl.h"
-
-namespace rmq
-{
-
-RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName)
-{
-    m_pMQClientFactory = pMQClientFactory;
-    m_groupName = groupName;
-}
-
-void RemoteBrokerOffsetStore::load()
-{
-
-}
-
-void RemoteBrokerOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)
-{
-    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
-    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
-    if (it == m_offsetTable.end())
-    {
-        m_offsetTable[mq] = offset;
-        it = m_offsetTable.find(mq);
-    }
-
-    kpr::AtomicLong& offsetOld = it->second;
-    if (increaseOnly)
-    {
-        MixAll::compareAndIncreaseOnly(offsetOld, offset);
-    }
-    else
-    {
-        offsetOld.set(offset);
-    }
-}
-
-long long RemoteBrokerOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type)
-{
-    RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
-    switch (type)
-    {
-        case MEMORY_FIRST_THEN_STORE:
-        case READ_FROM_MEMORY:
-        {
-            kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-            typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
-            if (it != m_offsetTable.end())
-            {
-                return it->second.get();
-            }
-            else if (READ_FROM_MEMORY == type)
-            {
-                RMQ_DEBUG("No offset in memory, MQ:%s", mq.toString().c_str());
-                return -1;
-            }
-        }
-        case READ_FROM_STORE:
-        {
-            try
-            {
-                long long brokerOffset = this->fetchConsumeOffsetFromBroker(mq);
-                RMQ_DEBUG("fetchConsumeOffsetFromBroker, MQ:%s, brokerOffset:%lld",
-                          mq.toString().c_str(), brokerOffset);
-                if (brokerOffset >= 0)
-                {
-                    this->updateOffset(mq, brokerOffset, false);
-                }
-                return brokerOffset;
-            }
-            // No offset in broker
-            catch (const MQBrokerException& e)
-            {
-                RMQ_WARN("No offset in broker, MQ:%s, exception:%s", mq.toString().c_str(), e.what());
-                return -1;
-            }
-            catch (const std::exception& e)
-            {
-                RMQ_ERROR("fetchConsumeOffsetFromBroker exception, MQ:%s, msg:%s",
-                          mq.toString().c_str(), e.what());
-                return -2;
-            }
-            catch (...)
-            {
-                RMQ_ERROR("fetchConsumeOffsetFromBroker unknow exception, MQ:%s",
-                          mq.toString().c_str());
-                return -2;
-            }
-        }
-        default:
-            break;
-    }
-
-    return -1;
-}
-
-void RemoteBrokerOffsetStore::persistAll(std::set<MessageQueue>& mqs)
-{
-    if (mqs.empty())
-    {
-        return;
-    }
-
-    std::set<MessageQueue> unusedMQ;
-    long long times = m_storeTimesTotal.fetchAndAdd(1);
-
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-    for (typeof(m_offsetTable.begin()) it = m_offsetTable.begin();
-         it != m_offsetTable.end(); it++)
-    {
-        MessageQueue mq = it->first;
-        kpr::AtomicLong& offset = it->second;
-        if (mqs.find(mq) != mqs.end())
-        {
-            try
-            {
-                this->updateConsumeOffsetToBroker(mq, offset.get());
-                if ((times % 12) == 0)
-                {
-                    RMQ_INFO("updateConsumeOffsetToBroker, Group: {%s} ClientId: {%s}  mq:{%s} offset {%llu}",
-                             m_groupName.c_str(),
-                             m_pMQClientFactory->getClientId().c_str(),
-                             mq.toString().c_str(),
-                             offset.get());
-                }
-            }
-            catch (...)
-            {
-                RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
-            }
-        }
-        else
-        {
-            unusedMQ.insert(mq);
-        }
-    }
-
-    if (!unusedMQ.empty())
-    {
-        for (typeof(unusedMQ.begin()) it = unusedMQ.begin(); it != unusedMQ.end(); it++)
-        {
-            m_offsetTable.erase(*it);
-            RMQ_INFO("remove unused mq, %s, %s", it->toString().c_str(), m_groupName.c_str());
-        }
-    }
-}
-
-void RemoteBrokerOffsetStore::persist(const MessageQueue& mq)
-{
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
-    if (it != m_offsetTable.end())
-    {
-        try
-        {
-            this->updateConsumeOffsetToBroker(mq, it->second.get());
-            RMQ_DEBUG("updateConsumeOffsetToBroker ok, mq=%s, offset=%lld", mq.toString().c_str(), it->second.get());
-        }
-        catch (...)
-        {
-            RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
-        }
-    }
-}
-
-void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq)
-{
-    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
-    m_offsetTable.erase(mq);
-    RMQ_INFO("remove unnecessary messageQueue offset. mq=%s, offsetTableSize=%u",
-             mq.toString().c_str(), (unsigned)m_offsetTable.size());
-}
-
-
-std::map<MessageQueue, long long> RemoteBrokerOffsetStore::cloneOffsetTable(const std::string& topic)
-{
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-    std::map<MessageQueue, long long> cloneOffsetTable;
-    RMQ_FOR_EACH(m_offsetTable, it)
-    {
-        MessageQueue mq = it->first;
-        kpr::AtomicLong& offset = it->second;
-        if (topic == mq.getTopic())
-        {
-            cloneOffsetTable[mq] = offset.get();
-        }
-    }
-
-    return cloneOffsetTable;
-}
-
-
-void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset)
-{
-    FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
-    if (findBrokerResult.brokerAddr.empty())
-    {
-        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
-        findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
-    }
-
-    if (!findBrokerResult.brokerAddr.empty())
-    {
-        UpdateConsumerOffsetRequestHeader* requestHeader = new UpdateConsumerOffsetRequestHeader();
-        requestHeader->topic = mq.getTopic();
-        requestHeader->consumerGroup = this->m_groupName;
-        requestHeader->queueId = mq.getQueueId();
-        requestHeader->commitOffset = offset;
-
-        m_pMQClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
-            findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
-    }
-    else
-    {
-        THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-    }
-}
-
-long long RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq)
-{
-    FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
-    if (findBrokerResult.brokerAddr.empty())
-    {
-        // TODO Here may be heavily overhead for Name Server,need tuning
-        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
-        findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
-    }
-
-    if (!findBrokerResult.brokerAddr.empty())
-    {
-        QueryConsumerOffsetRequestHeader* requestHeader = new QueryConsumerOffsetRequestHeader();
-        requestHeader->topic = mq.getTopic();
-        requestHeader->consumerGroup = this->m_groupName;
-        requestHeader->queueId = mq.getQueueId();
-
-        return m_pMQClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
-                   findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
-    }
-    else
-    {
-        THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
deleted file mode 100755
index b613084..0000000
--- a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REMOTEBROKEROFFSETSTORE_H__
-#define __REMOTEBROKEROFFSETSTORE_H__
-
-#include "OffsetStore.h"
-#include <map>
-#include <string>
-#include <set>
-#include "MessageQueue.h"
-#include "AtomicValue.h"
-#include "Mutex.h"
-
-namespace rmq
-{
-    class MQClientFactory;
-
-    /**
-    * offset remote store
-    *
-    */
-    class RemoteBrokerOffsetStore : public OffsetStore
-    {
-    public:
-        RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) ;
-
-        void load();
-        void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly);
-        long long readOffset(const MessageQueue& mq, ReadOffsetType type);
-        void persistAll(std::set<MessageQueue>& mqs);
-        void persist(const MessageQueue& mq);
-        void removeOffset(const MessageQueue& mq) ;
-        std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic);
-
-    private:
-        void updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset);
-        long long fetchConsumeOffsetFromBroker(const MessageQueue& mq);
-
-    private:
-        MQClientFactory* m_pMQClientFactory;
-        std::string m_groupName;
-        kpr::AtomicInteger m_storeTimesTotal;
-        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
-        kpr::RWMutex m_tableMutex;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
deleted file mode 100755
index ed5cf12..0000000
--- a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "SubscriptionData.h"
-
-#include <sstream>
-#include "KPRUtil.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-std::string SubscriptionData::SUB_ALL = "*";
-
-SubscriptionData::SubscriptionData()
-{
-    m_subVersion = KPRUtil::GetCurrentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
-    : m_topic(topic),
-      m_subString(subString)
-{
-    m_subVersion = KPRUtil::GetCurrentTimeMillis();
-}
-
-std::string SubscriptionData::getTopic()const
-{
-    return m_topic;
-}
-
-void SubscriptionData::setTopic(const std::string& topic)
-{
-    m_topic = topic;
-}
-
-std::string SubscriptionData::getSubString()
-{
-    return m_subString;
-}
-
-void SubscriptionData::setSubString(const std::string& subString)
-{
-    m_subString = subString;
-}
-
-std::set<std::string>& SubscriptionData::getTagsSet()
-{
-    return m_tagsSet;
-}
-
-void SubscriptionData::setTagsSet(const std::set<std::string>& tagsSet)
-{
-    m_tagsSet = tagsSet;
-}
-
-long long SubscriptionData::getSubVersion()
-{
-    return m_subVersion;
-}
-
-void SubscriptionData::setSubVersion(long long subVersion)
-{
-    m_subVersion = subVersion;
-}
-
-std::set<int>& SubscriptionData::getCodeSet()
-{
-    return m_codeSet;
-}
-
-void SubscriptionData::setCodeSet(const std::set<int>& codeSet)
-{
-    m_codeSet = codeSet;
-}
-
-int SubscriptionData::hashCode()
-{
-    /*
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + (classFilterMode ? 1231 : 1237);
-    result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
-    result = prime * result + ((subString == null) ? 0 : subString.hashCode());
-    result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
-    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
-    return result;
-    */
-    std::stringstream ss;
-    ss  << UtilAll::hashCode(m_codeSet)
-        << m_subString
-        << UtilAll::hashCode(m_tagsSet)
-        << m_topic;
-    return UtilAll::hashCode(ss.str());
-}
-
-
-
-
-bool SubscriptionData::operator==(const SubscriptionData& other)
-{
-    if (m_codeSet != other.m_codeSet)
-    {
-        return false;
-    }
-
-    if (m_subString != other.m_subString)
-    {
-        return false;
-    }
-
-    if (m_subVersion != other.m_subVersion)
-    {
-        return false;
-    }
-
-    if (m_tagsSet != other.m_tagsSet)
-    {
-        return false;
-    }
-
-    if (m_topic != other.m_topic)
-    {
-        return false;
-    }
-
-    return true;
-}
-
-bool SubscriptionData::operator<(const SubscriptionData& other)const
-{
-    if (m_topic < other.m_topic)
-    {
-        return true;
-    }
-    else if (m_topic == other.m_topic)
-    {
-        if (m_subString < other.m_subString)
-        {
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-    else
-    {
-        return false;
-    }
-}
-
-void SubscriptionData::toJson(Json::Value& obj) const
-{
-    obj["classFilterMode"] = false;
-    obj["topic"] = m_topic;
-    obj["subString"] = m_subString;
-    obj["subVersion"] = (long long)m_subVersion;
-
-    Json::Value tagSet(Json::arrayValue);
-    RMQ_FOR_EACH(m_tagsSet, it)
-    {
-        tagSet.append(*it);
-    }
-    obj["tagsSet"] = tagSet;
-
-    Json::Value codeSet(Json::arrayValue);
-    RMQ_FOR_EACH(m_codeSet, it)
-    {
-        codeSet.append(*it);
-    }
-    obj["codeSet"] = codeSet;
-}
-
-std::string SubscriptionData::toString() const
-{
-    std::stringstream ss;
-    ss << "{classFilterMode=" << false
-       << ",topic=" << m_topic
-       << ",subString=" << m_subString
-       << ",subVersion=" << m_subVersion
-       << ",tagsSet=" << UtilAll::toString(m_tagsSet)
-       << ",codeSet=" << UtilAll::toString(m_codeSet)
-       << "}";
-    return ss.str();
-}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.h b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
deleted file mode 100755
index 4796fb7..0000000
--- a/rocketmq-client4cpp/src/consumer/SubscriptionData.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __SUBSCRIPTIONDATA_H__
-#define __SUBSCRIPTIONDATA_H__
-
-#include <string>
-#include <set>
-
-#include "RocketMQClient.h"
-#include "RemotingSerializable.h"
-#include "RefHandle.h"
-#include "json/json.h"
-
-namespace rmq
-{
-    class SubscriptionData : public kpr::RefCount
-    {
-    public:
-        SubscriptionData();
-        SubscriptionData(const std::string& topic, const std::string& subString);
-
-        std::string getTopic()const;
-        void setTopic(const std::string& topic);
-
-        std::string getSubString();
-        void setSubString(const std::string& subString);
-
-        std::set<std::string>& getTagsSet();
-        void setTagsSet(const std::set<std::string>& tagsSet);
-
-        long long getSubVersion();
-        void setSubVersion(long long subVersion);
-
-        std::set<int>& getCodeSet();
-        void setCodeSet(const std::set<int>& codeSet);
-
-        int hashCode();
-        void toJson(Json::Value& obj) const;
-		std::string toString() const;
-
-        bool operator==(const SubscriptionData& other);
-        bool operator<(const SubscriptionData& other)const;
-
-    public:
-        static std::string SUB_ALL;
-
-    private:
-        std::string m_topic;
-        std::string m_subString;
-        std::set<std::string> m_tagsSet;
-        std::set<int> m_codeSet;
-        long long m_subVersion ;
-    };
-	typedef kpr::RefHandleT<SubscriptionData> SubscriptionDataPtr;
-
-	inline std::ostream& operator<<(std::ostream& os, const SubscriptionData& obj)
-	{
-	    os << obj.toString();
-	    return os;
-	}
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/AUTHORS b/rocketmq-client4cpp/src/jsoncpp/AUTHORS
deleted file mode 100755
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/LICENSE b/rocketmq-client4cpp/src/jsoncpp/LICENSE
deleted file mode 100755
index 403d096..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/LICENSE
+++ /dev/null
@@ -1 +0,0 @@
-The json-cpp library and this documentation are in Public Domain.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/README.txt
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/README.txt b/rocketmq-client4cpp/src/jsoncpp/README.txt
deleted file mode 100755
index 379d376..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/README.txt
+++ /dev/null
@@ -1,117 +0,0 @@
-* Introduction:
-  =============
-
-JSON (JavaScript Object Notation) is a lightweight data-interchange format. 
-It can represent integer, real number, string, an ordered sequence of 
-value, and a collection of name/value pairs.
-
-JsonCpp is a simple API to manipulate JSON value, handle serialization 
-and unserialization to string.
-
-It can also preserve existing comment in unserialization/serialization steps,
-making it a convenient format to store user input files.
-
-Unserialization parsing is user friendly and provides precise error reports.
-
-
-* Building/Testing:
-  =================
-
-JsonCpp uses Scons (http://www.scons.org) as a build system. Scons requires
-python to be installed (http://www.python.org).
-
-You download scons-local distribution from the following url:
-http://sourceforge.net/project/showfiles.php?group_id=30337&package_id=67375
-
-Unzip it in the directory where you found this README file. scons.py Should be 
-at the same level as README.
-
-python scons.py platform=PLTFRM [TARGET]
-where PLTFRM may be one of:
-	suncc Sun C++ (Solaris)
-	vacpp Visual Age C++ (AIX)
-	mingw 
-	msvc6 Microsoft Visual Studio 6 service pack 5-6
-	msvc70 Microsoft Visual Studio 2002
-	msvc71 Microsoft Visual Studio 2003
-	msvc80 Microsoft Visual Studio 2005
-	linux-gcc Gnu C++ (linux, also reported to work for Mac OS X)
-	
-adding platform is fairly simple. You need to change the Sconstruct file 
-to do so.
-	
-and TARGET may be:
-	check: build library and run unit tests.
-
-    
-* Running the test manually:
-  ==========================
-
-cd test
-# This will run the Reader/Writer tests
-python runjsontests.py "path to jsontest.exe"
-
-# This will run the Reader/Writer tests, using JSONChecker test suite
-# (http://www.json.org/JSON_checker/).
-# Notes: not all tests pass: JsonCpp is too lenient (for example,
-# it allows an integer to start with '0'). The goal is to improve
-# strict mode parsing to get all tests to pass.
-python runjsontests.py --with-json-checker "path to jsontest.exe"
-
-# This will run the unit tests (mostly Value)
-python rununittests.py "path to test_lib_json.exe"
-
-You can run the tests using valgrind:
-python rununittests.py --valgrind "path to test_lib_json.exe"
-
-
-* Building the documentation:
-  ===========================
-
-Run the python script doxybuild.py from the top directory:
-
-python doxybuild.py --open --with-dot
-
-See doxybuild.py --help for options. 
-
-
-* Adding a reader/writer test:
-  ============================
-
-To add a test, you need to create two files in test/data:
-- a TESTNAME.json file, that contains the input document in JSON format.
-- a TESTNAME.expected file, that contains a flatened representation of 
-  the input document.
-  
-TESTNAME.expected file format:
-- each line represents a JSON element of the element tree represented 
-  by the input document.
-- each line has two parts: the path to access the element separated from
-  the element value by '='. Array and object values are always empty 
-  (e.g. represented by either [] or {}).
-- element path: '.' represented the root element, and is used to separate 
-  object members. [N] is used to specify the value of an array element
-  at index N.
-See test_complex_01.json and test_complex_01.expected to better understand
-element path.
-
-
-* Understanding reader/writer test output:
-  ========================================
-
-When a test is run, output files are generated aside the input test files. 
-Below is a short description of the content of each file:
-
-- test_complex_01.json: input JSON document
-- test_complex_01.expected: flattened JSON element tree used to check if 
-    parsing was corrected.
-
-- test_complex_01.actual: flattened JSON element tree produced by 
-    jsontest.exe from reading test_complex_01.json
-- test_complex_01.rewrite: JSON document written by jsontest.exe using the
-    Json::Value parsed from test_complex_01.json and serialized using
-    Json::StyledWritter.
-- test_complex_01.actual-rewrite: flattened JSON element tree produced by 
-    jsontest.exe from reading test_complex_01.rewrite.
-test_complex_01.process-output: jsontest.exe output, typically useful to
-    understand parsing error.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
deleted file mode 100755
index 1235a3e..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef CPPTL_JSON_ALLOCATOR_H_INCLUDED
-#define CPPTL_JSON_ALLOCATOR_H_INCLUDED
-
-#include <cstring>
-#include <memory>
-
-namespace rmq {
-namespace Json {
-template<typename T>
-class SecureAllocator {
-	public:
-		// Type definitions
-		using value_type      = T;
-		using pointer         = T*;
-		using const_pointer   = const T*;
-		using reference       = T&;
-		using const_reference = const T&;
-		using size_type       = std::size_t;
-		using difference_type = std::ptrdiff_t;
-
-		/**
-		 * Allocate memory for N items using the standard allocator.
-		 */
-		pointer allocate(size_type n) {
-			// allocate using "global operator new"
-			return static_cast<pointer>(::operator new(n * sizeof(T)));
-		}
-
-		/**
-		 * Release memory which was allocated for N items at pointer P.
-		 *
-		 * The memory block is filled with zeroes before being released.
-		 * The pointer argument is tagged as "volatile" to prevent the
-		 * compiler optimizing out this critical step.
-		 */
-		void deallocate(volatile pointer p, size_type n) {
-			std::memset(p, 0, n * sizeof(T));
-			// free using "global operator delete"
-			::operator delete(p);
-		}
-
-		/**
-		 * Construct an item in-place at pointer P.
-		 */
-		template<typename... Args>
-		void construct(pointer p, Args&&... args) {
-			// construct using "placement new" and "perfect forwarding"
-			::new (static_cast<void*>(p)) T(std::forward<Args>(args)...);
-		}
-
-		size_type max_size() const {
-			return size_t(-1) / sizeof(T);
-		}
-
-		pointer address( reference x ) const {
-			return std::addressof(x);
-		}
-
-		const_pointer address( const_reference x ) const {
-			return std::addressof(x);
-		}
-
-		/**
-		 * Destroy an item in-place at pointer P.
-		 */
-		void destroy(pointer p) {
-			// destroy using "explicit destructor"
-			p->~T();
-		}
-
-		// Boilerplate
-		SecureAllocator() {}
-		template<typename U> SecureAllocator(const SecureAllocator<U>&) {}
-		template<typename U> struct rebind { using other = SecureAllocator<U>; };
-};
-
-
-template<typename T, typename U>
-bool operator==(const SecureAllocator<T>&, const SecureAllocator<U>&) {
-	return true;
-}
-
-template<typename T, typename U>
-bool operator!=(const SecureAllocator<T>&, const SecureAllocator<U>&) {
-	return false;
-}
-
-} //namespace Json
-} //namespace rmq
-
-#endif // CPPTL_JSON_ALLOCATOR_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
deleted file mode 100755
index dc67b27..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef CPPTL_JSON_ASSERTIONS_H_INCLUDED
-#define CPPTL_JSON_ASSERTIONS_H_INCLUDED
-
-#include <stdlib.h>
-#include <sstream>
-
-#if !defined(JSON_IS_AMALGAMATION)
-#include "config.h"
-#endif // if !defined(JSON_IS_AMALGAMATION)
-
-/** It should not be possible for a maliciously designed file to
- *  cause an abort() or seg-fault, so these macros are used only
- *  for pre-condition violations and internal logic errors.
- */
-#if JSON_USE_EXCEPTION
-
-// @todo <= add detail about condition in exception
-# define JSON_ASSERT(condition)                                                \
-  {if (!(condition)) {rmq::Json::throwLogicError( "assert json failed" );}}
-
-# define JSON_FAIL_MESSAGE(message)                                            \
-  {                                                                            \
-    JSONCPP_OSTRINGSTREAM oss; oss << message;                                    \
-    rmq::Json::throwLogicError(oss.str());                                          \
-    abort();                                                                   \
-  }
-
-#else // JSON_USE_EXCEPTION
-
-# define JSON_ASSERT(condition) assert(condition)
-
-// The call to assert() will show the failure message in debug builds. In
-// release builds we abort, for a core-dump or debugger.
-# define JSON_FAIL_MESSAGE(message)                                            \
-  {                                                                            \
-    JSONCPP_OSTRINGSTREAM oss; oss << message;                                    \
-    assert(false && oss.str().c_str());                                        \
-    abort();                                                                   \
-  }
-
-
-#endif
-
-#define JSON_ASSERT_MESSAGE(condition, message)                                \
-  if (!(condition)) {                                                          \
-    JSON_FAIL_MESSAGE(message);                                                \
-  }
-
-#endif // CPPTL_JSON_ASSERTIONS_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
deleted file mode 100644
index 6fcc8af..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef JSON_AUTOLINK_H_INCLUDED
-#define JSON_AUTOLINK_H_INCLUDED
-
-#include "config.h"
-
-#ifdef JSON_IN_CPPTL
-#include <cpptl/cpptl_autolink.h>
-#endif
-
-#if !defined(JSON_NO_AUTOLINK) && !defined(JSON_DLL_BUILD) &&                  \
-    !defined(JSON_IN_CPPTL)
-#define CPPTL_AUTOLINK_NAME "json"
-#undef CPPTL_AUTOLINK_DLL
-#ifdef JSON_DLL
-#define CPPTL_AUTOLINK_DLL
-#endif
-#include "autolink.h"
-#endif
-
-#endif // JSON_AUTOLINK_H_INCLUDED