You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:46 UTC

[12/17] incubator-rocketmq-externals git commit: Polish cpp module

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
deleted file mode 100755
index c7d9695..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
+++ /dev/null
@@ -1,574 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include <list>
-#include <string>
-
-#include "ConsumeMessageOrderlyService.h"
-#include "DefaultMQPushConsumerImpl.h"
-#include "MQClientFactory.h"
-#include "DefaultMQProducer.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include "RebalanceImpl.h"
-#include "DefaultMQPushConsumer.h"
-#include "OffsetStore.h"
-#include "ScopedLock.h"
-#include "KPRUtil.h"
-#include "MixAll.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-class LockMq : public kpr::TimerHandler
-{
-public:
-    LockMq(ConsumeMessageOrderlyService* pService)
-        : m_pService(pService)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-        m_pService->lockMQPeriodically();
-
-        // can not delete
-        //delete this;
-    }
-
-private:
-    ConsumeMessageOrderlyService* m_pService;
-};
-
-class SubmitConsumeRequestLaterOrderly : public kpr::TimerHandler
-{
-public:
-    SubmitConsumeRequestLaterOrderly(ProcessQueue* pProcessQueue,
-                                     const MessageQueue& messageQueue,
-                                     ConsumeMessageOrderlyService* pService)
-        : m_pProcessQueue(pProcessQueue),
-          m_messageQueue(messageQueue),
-          m_pService(pService)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-    	try
-    	{
-        	std::list<MessageExt*> msgs;
-       		m_pService->submitConsumeRequest(msgs, m_pProcessQueue, m_messageQueue, true);
-        }
-        catch(...)
-        {
-        	RMQ_ERROR("SubmitConsumeRequestLaterOrderly OnTimeOut exception");
-        }
-
-        delete this;
-    }
-
-private:
-    ProcessQueue* m_pProcessQueue;
-    MessageQueue m_messageQueue;
-    ConsumeMessageOrderlyService* m_pService;
-};
-
-
-class TryLockLaterAndReconsume : public kpr::TimerHandler
-{
-public:
-    TryLockLaterAndReconsume(ProcessQueue* pProcessQueue,
-                             MessageQueue& messageQueue,
-                             ConsumeMessageOrderlyService* pService)
-        : m_pProcessQueue(pProcessQueue),
-          m_messageQueue(messageQueue),
-          m_pService(pService)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-    	try
-    	{
-	        bool lockOK = m_pService->lockOneMQ(m_messageQueue);
-	        if (lockOK)
-	        {
-	            m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
-	        }
-	        else
-	        {
-	            m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 3000);
-	        }
-        }
-        catch(...)
-        {
-        	RMQ_ERROR("TryLockLaterAndReconsume OnTimeOut exception");
-        }
-
-        delete this;
-    }
-
-private:
-    ProcessQueue* m_pProcessQueue;
-    MessageQueue m_messageQueue;
-    ConsumeMessageOrderlyService* m_pService;
-};
-
-
-
-ConsumeMessageOrderlyService::ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
-        MessageListenerOrderly* pMessageListener)
-{
-	m_stoped = false;
-    m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
-    m_pMessageListener = pMessageListener;
-    m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
-    m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
-    m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 1,
-    m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
-    m_scheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 10);
-}
-
-ConsumeMessageOrderlyService::~ConsumeMessageOrderlyService()
-{
-}
-
-
-void ConsumeMessageOrderlyService::start()
-{
-    m_scheduledExecutorService->Start();
-
-    LockMq* lm = new LockMq(this);
-    m_scheduledExecutorService->RegisterTimer(0, ProcessQueue::s_RebalanceLockInterval, lm, true);
-}
-
-void ConsumeMessageOrderlyService::shutdown()
-{
-    m_stoped = true;
-    m_pConsumeExecutor->Destroy();
-    m_scheduledExecutorService->Stop();
-    m_scheduledExecutorService->Join();
-    unlockAllMQ();
-}
-
-void ConsumeMessageOrderlyService::unlockAllMQ()
-{
-    m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->unlockAll(false);
-}
-
-void ConsumeMessageOrderlyService::lockMQPeriodically()
-{
-    if (!m_stoped)
-    {
-        m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lockAll();
-    }
-}
-
-bool ConsumeMessageOrderlyService::lockOneMQ(MessageQueue& mq)
-{
-    if (!m_stoped)
-    {
-        return m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lock(mq);
-    }
-
-    return false;
-}
-
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(MessageQueue& messageQueue,
-        ProcessQueue* pProcessQueue,
-        long long delayMills)
-{
-    TryLockLaterAndReconsume* consume = new TryLockLaterAndReconsume(pProcessQueue, messageQueue, this);
-    m_scheduledExecutorService->RegisterTimer(0, int(delayMills), consume, false);
-}
-
-ConsumerStat& ConsumeMessageOrderlyService::getConsumerStat()
-{
-    return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
-}
-
-void ConsumeMessageOrderlyService::submitConsumeRequestLater(ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue,
-        long long suspendTimeMillis)
-{
-    long timeMillis = long(suspendTimeMillis);
-    if (timeMillis < 10)
-    {
-        timeMillis = 10;
-    }
-    else if (timeMillis > 30000)
-    {
-        timeMillis = 30000;
-    }
-
-    SubmitConsumeRequestLaterOrderly* sc = new SubmitConsumeRequestLaterOrderly(pProcessQueue, messageQueue, this);
-    m_scheduledExecutorService->RegisterTimer(0, timeMillis, sc, false);
-}
-
-void ConsumeMessageOrderlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
-        ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue,
-        bool dispathToConsume)
-{
-    if (dispathToConsume)
-    {
-        kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeOrderlyRequest(pProcessQueue, messageQueue, this);
-        m_pConsumeExecutor->AddWork(consumeRequest);
-    }
-}
-
-void ConsumeMessageOrderlyService::updateCorePoolSize(int corePoolSize)
-{
-}
-
-
-std::string& ConsumeMessageOrderlyService::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-MessageListenerOrderly* ConsumeMessageOrderlyService::getMessageListener()
-{
-    return m_pMessageListener;
-}
-
-DefaultMQPushConsumerImpl* ConsumeMessageOrderlyService::getDefaultMQPushConsumerImpl()
-{
-    return m_pDefaultMQPushConsumerImpl;
-}
-
-bool ConsumeMessageOrderlyService::processConsumeResult(std::list<MessageExt*>& msgs,
-        ConsumeOrderlyStatus status,
-        ConsumeOrderlyContext& context,
-        ConsumeOrderlyRequest& consumeRequest)
-{
-    bool continueConsume = true;
-    long long commitOffset = -1L;
-    int msgsSize = msgs.size();
-
-    if (context.autoCommit)
-    {
-        switch (status)
-        {
-            case COMMIT:
-            case ROLLBACK:
-                RMQ_WARN("the message queue consume result is illegal, we think you want to ack these message: %s",
-                	consumeRequest.getMessageQueue().toString().c_str());
-            case SUCCESS:
-                getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
-                commitOffset = consumeRequest.getProcessQueue()->commit();
-                break;
-            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
-                getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
-	            if (checkReconsumeTimes(msgs))
-	            {
-	                consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
-	                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
-	                                          consumeRequest.getMessageQueue(),
-	                                          context.suspendCurrentQueueTimeMillis);
-	                continueConsume = false;
-				}
-				else
-				{
-					commitOffset = consumeRequest.getProcessQueue()->commit();
-				}
-
-                break;
-            default:
-                break;
-        }
-    }
-    else
-    {
-        switch (status)
-        {
-            case SUCCESS:
-                getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
-                break;
-            case COMMIT:
-                commitOffset = consumeRequest.getProcessQueue()->commit();
-                break;
-            case ROLLBACK:
-                consumeRequest.getProcessQueue()->rollback();
-                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
-                                          consumeRequest.getMessageQueue(),
-                                          context.suspendCurrentQueueTimeMillis);
-                continueConsume = false;
-                break;
-            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
-                getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
-	            if (checkReconsumeTimes(msgs))
-	            {
-	                consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
-	                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
-	                                          consumeRequest.getMessageQueue(),
-	                                          context.suspendCurrentQueueTimeMillis);
-	                continueConsume = false;
-                }
-                break;
-            default:
-                break;
-        }
-    }
-
-    if (commitOffset >= 0 && !consumeRequest.getProcessQueue()->isDropped())
-    {
-        m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
-                commitOffset, false);
-    }
-
-    return continueConsume;
-}
-
-bool ConsumeMessageOrderlyService::checkReconsumeTimes(std::list<MessageExt*>& msgs)
-{
-    bool suspend = false;
-
-    if (!msgs.empty())
-    {
-    	std::list<MessageExt*>::iterator it = msgs.begin();
-        for (; it != msgs.end(); it++)
-        {
-            MessageExt* msg = *it;
-            if (msg->getReconsumeTimes() >= m_pDefaultMQPushConsumer->getMaxReconsumeTimes())
-            {
-            	msg->putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(msg->getReconsumeTimes()));
-
-                if (!sendMessageBack(*msg))
-                {
-                    suspend = true;
-                    msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
-                }
-            }
-            else
-            {
-                suspend = true;
-                msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
-            }
-        }
-    }
-
-    return suspend;
-}
-
-bool ConsumeMessageOrderlyService::sendMessageBack(MessageExt& msg)
-{
-    try
-    {
-    	Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()),
-                       msg.getBody(), msg.getBodyLen());
-
-		std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
-		newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
-                    : originMsgId);
-
-        newMsg.setFlag(msg.getFlag());
-        newMsg.setProperties(msg.getProperties());
-        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
-
-        int reTimes = msg.getReconsumeTimes() + 1;
-        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
-        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes()));
-        newMsg.setDelayTimeLevel(3 + reTimes);
-
-        m_pDefaultMQPushConsumerImpl->getmQClientFactory()->getDefaultMQProducer()->send(newMsg);
-
-        return true;
-    }
-    catch (...)
-    {
-        RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
-                  m_consumerGroup.c_str(), msg.toString().c_str());
-    }
-
-    return false;
-}
-
-
-MessageQueueLock& ConsumeMessageOrderlyService::getMessageQueueLock()
-{
-    return m_messageQueueLock;
-}
-
-DefaultMQPushConsumer* ConsumeMessageOrderlyService::getDefaultMQPushConsumer()
-{
-    return m_pDefaultMQPushConsumer;
-}
-
-ConsumeOrderlyRequest::ConsumeOrderlyRequest(ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue,
-        ConsumeMessageOrderlyService* pService)
-{
-	m_pProcessQueue = pProcessQueue;
-	m_messageQueue = messageQueue;
-	m_pService = pService;
-}
-
-ConsumeOrderlyRequest::~ConsumeOrderlyRequest()
-{
-}
-
-void ConsumeOrderlyRequest::Do()
-{
-	if (m_pProcessQueue->isDropped())
-	{
-        RMQ_WARN("run, the message queue not be able to consume, because it's dropped, MQ: %s",
-            m_messageQueue.toString().c_str());
-        return;
-    }
-
-	try
-	{
-	    kpr::Mutex* objLock = m_pService->getMessageQueueLock().fetchLockObject(m_messageQueue);
-	    {
-	        kpr::ScopedLock<kpr::Mutex> lock(*objLock);
-
-	        MessageModel messageModel = m_pService->getDefaultMQPushConsumerImpl()->messageModel();
-	        if (BROADCASTING == messageModel
-	        	|| (m_pProcessQueue->isLocked() || !m_pProcessQueue->isLockExpired()))
-	        {
-	            long long beginTime = KPRUtil::GetCurrentTimeMillis();
-	            for (bool continueConsume = true; continueConsume;)
-	            {
-	                if (m_pProcessQueue->isDropped())
-	                {
-	                    RMQ_INFO("the message queue not be able to consume, because it's droped, MQ: %s",
-	                             m_messageQueue.toString().c_str());
-	                    break;
-	                }
-
-	                if (CLUSTERING == messageModel
-	                 	&& !m_pProcessQueue->isLocked())
-	                {
-	                    RMQ_WARN("the message queue not locked, so consume later, MQ: %s", m_messageQueue.toString().c_str());
-	                    m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
-	                    break;
-	                }
-
-	                if (CLUSTERING == messageModel
-	                 	&& m_pProcessQueue->isLockExpired())
-	                {
-	                    RMQ_WARN("the message queue lock expired, so consume later, MQ: %s", m_messageQueue.toString().c_str());
-	                    m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
-	                    break;
-	                }
-
-	                long interval = long(KPRUtil::GetCurrentTimeMillis() - beginTime);
-	                if (interval > ConsumeMessageOrderlyService::s_MaxTimeConsumeContinuously)
-	                {
-	                    m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
-	                    break;
-	                }
-
-	                int consumeBatchSize =
-	                    m_pService->getDefaultMQPushConsumer()->getConsumeMessageBatchMaxSize();
-
-	                std::list<MessageExt*> msgs = m_pProcessQueue->takeMessages(consumeBatchSize);
-	                if (!msgs.empty())
-	                {
-	                    ConsumeOrderlyContext context(m_messageQueue);
-
-	                    ConsumeOrderlyStatus status = SUSPEND_CURRENT_QUEUE_A_MOMENT;
-
-	                    ConsumeMessageContext consumeMessageContext;
-	                    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
-	                    {
-	                        consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
-	                        consumeMessageContext.mq = m_messageQueue;
-	                        consumeMessageContext.msgList = msgs;
-	                        consumeMessageContext.success = false;
-	                        m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
-	                    }
-
-	                    long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
-	                    try
-	                    {
-	                    	kpr::ScopedLock<kpr::Mutex> lock(m_pProcessQueue->getLockConsume());
-	                    	if (m_pProcessQueue->isDropped())
-							{
-						        RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
-						            m_messageQueue.toString().c_str());
-						        break;
-						    }
-
-	                        status = m_pService->getMessageListener()->consumeMessage(msgs, context);
-	                    }
-	                    catch (...)
-	                    {
-	                        RMQ_WARN("consumeMessage exception, Group: {%s}, Msgs: {%u}, MQ: %s",//
-	                                 m_pService->getConsumerGroup().c_str(),
-	                                 (unsigned)msgs.size(),
-	                                 m_messageQueue.toString().c_str());
-	                    }
-
-	                    long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
-
-	                    if (SUSPEND_CURRENT_QUEUE_A_MOMENT == status
-	                    	|| ROLLBACK == status)
-	                    {
-	                    	RMQ_WARN("consumeMessage Orderly return not OK, Group: {%s} Msgs: {%u} MQ: %s",//
-	                                    m_pService->getConsumerGroup().c_str(),
-	                                    (unsigned)msgs.size(),
-	                                     m_messageQueue.toString().c_str());
-	                    	//status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-	                    }
-
-	                    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
-	                    {
-	                        consumeMessageContext.success = (SUCCESS == status
-	                                                         || COMMIT == status);
-	                        m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
-	                    }
-
-	                    m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
-	                    MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat()
-	                                                   .consumeMsgRTMax, consumeRT);
-
-	                    continueConsume = m_pService->processConsumeResult(msgs, status, context, *this);
-	                }
-	                else
-	                {
-	                    continueConsume = false;
-	                }
-	            }
-	        }
-	        else
-	        {
-	        	if (m_pProcessQueue->isDropped())
-				{
-			        RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
-			            m_messageQueue.toString().c_str());
-			        return;
-			    }
-
-	            m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 100);
-	        }
-	    }
-	}
-	catch(...)
-	{
-		RMQ_WARN("ConsumeOrderlyRequest exception");
-	}
-
-    return;
-}
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
deleted file mode 100755
index 0f8628b..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMEMESSAGEORDERLYSERVICE_H__
-#define __CONSUMEMESSAGEORDERLYSERVICE_H__
-
-#include "ConsumeMessageService.h"
-
-#include <list>
-#include <string>
-#include "RocketMQClient.h"
-#include "ConsumerStatManage.h"
-#include "MessageQueueLock.h"
-#include "MessageListener.h"
-#include "ThreadPool.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
-class DefaultMQPushConsumerImpl;
-class MessageListenerOrderly;
-class DefaultMQPushConsumer;
-class ConsumeMessageOrderlyService;
-
-class ConsumeOrderlyRequest: public kpr::ThreadPoolWork
-{
-public:
-    ConsumeOrderlyRequest(ProcessQueue *pProcessQueue,
-                          const MessageQueue &messageQueue,
-                          ConsumeMessageOrderlyService *pService);
-    ~ConsumeOrderlyRequest();
-
-    virtual void Do();
-
-    ProcessQueue *getProcessQueue()
-    {
-        return m_pProcessQueue;
-    }
-
-    MessageQueue &getMessageQueue()
-    {
-        return m_messageQueue;
-    }
-
-private:
-    ProcessQueue *m_pProcessQueue;
-    MessageQueue m_messageQueue;
-    ConsumeMessageOrderlyService *m_pService;
-};
-
-
-class ConsumeMessageOrderlyService : public ConsumeMessageService
-{
-public:
-	static const long s_MaxTimeConsumeContinuously = 60000;
-
-public:
-    ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl
-                                 *pDefaultMQPushConsumerImpl,
-                                 MessageListenerOrderly *pMessageListener);
-    ~ConsumeMessageOrderlyService();
-
-    void start();
-    void shutdown();
-
-    void unlockAllMQ();
-    void lockMQPeriodically();
-    bool lockOneMQ(MessageQueue &mq);
-    void tryLockLaterAndReconsume(MessageQueue &messageQueue,
-                                  ProcessQueue *pProcessQueue,
-                                  long long delayMills);
-    bool processConsumeResult(std::list<MessageExt *> &msgs,
-                              ConsumeOrderlyStatus status,
-                              ConsumeOrderlyContext &context,
-                              ConsumeOrderlyRequest &consumeRequest);
-    bool checkReconsumeTimes(std::list<MessageExt *> &msgs);
-    bool sendMessageBack(MessageExt &msg);
-    ConsumerStat& getConsumerStat();
-
-    void submitConsumeRequestLater(ProcessQueue *pProcessQueue,
-                                   const MessageQueue &messageQueue,
-                                   long long suspendTimeMillis);
-
-    void submitConsumeRequest(std::list<MessageExt *> &msgs,
-                              ProcessQueue *pProcessQueue,
-                              const MessageQueue &messageQueue,
-                              bool dispathToConsume);
-
-    void updateCorePoolSize(int corePoolSize);
-    MessageQueueLock &getMessageQueueLock();
-    std::string &getConsumerGroup();
-    MessageListenerOrderly *getMessageListener();
-    DefaultMQPushConsumerImpl *getDefaultMQPushConsumerImpl();
-    DefaultMQPushConsumer *getDefaultMQPushConsumer();
-
-private:
-    volatile bool m_stoped;
-    DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
-    DefaultMQPushConsumer *m_pDefaultMQPushConsumer;
-    MessageListenerOrderly *m_pMessageListener;
-    std::string m_consumerGroup;
-    MessageQueueLock m_messageQueueLock;
-
-    kpr::ThreadPoolPtr m_pConsumeExecutor;
-    kpr::TimerThreadPtr m_scheduledExecutorService;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
deleted file mode 100755
index 57a9bee..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __CONSUMEMESSAGESERVICE_H__
-#define __CONSUMEMESSAGESERVICE_H__
-
-#include <list>
-
-namespace rmq
-{
-	class MessageExt;
-	class ProcessQueue;
-	class MessageQueue;
-
-	class ConsumeMessageService
-	{
-	public:
-	    virtual ~ConsumeMessageService() {}
-	    virtual void start() = 0;
-	    virtual void shutdown() = 0;
-	    virtual void updateCorePoolSize(int corePoolSize) = 0;
-	    virtual void submitConsumeRequest(std::list<MessageExt*>& msgs,
-	                                      ProcessQueue* pProcessQueue,
-	                                      const MessageQueue& messageQueue,
-	                                      bool dispathToConsume) = 0;
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
deleted file mode 100755
index 6ef5837..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ConsumeType.h"
-
-namespace rmq
-{
-
-const char* getConsumeTypeString(ConsumeType type)
-{
-    switch (type)
-    {
-        case CONSUME_ACTIVELY:
-            return "CONSUME_ACTIVELY";
-        case CONSUME_PASSIVELY:
-            return "CONSUME_PASSIVELY";
-    }
-
-    return "UnknowConsumeType";
-}
-
-const char* getConsumeFromWhereString(ConsumeFromWhere type)
-{
-    switch (type)
-    {
-        case CONSUME_FROM_LAST_OFFSET:
-            return "CONSUME_FROM_LAST_OFFSET";
-        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
-            return "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST";
-        case CONSUME_FROM_MAX_OFFSET:
-            return "CONSUME_FROM_MAX_OFFSET";
-        case CONSUME_FROM_MIN_OFFSET:
-            return "CONSUME_FROM_MIN_OFFSET";
-        case CONSUME_FROM_FIRST_OFFSET:
-            return "CONSUME_FROM_FIRST_OFFSET";
-        case CONSUME_FROM_TIMESTAMP:
-            return "CONSUME_FROM_TIMESTAMP";
-    }
-
-    return "UnknowConsumeFromWhere";
-}
-
-const char* getMessageModelString(MessageModel type)
-{
-    switch (type)
-    {
-        case CLUSTERING:
-            return "CLUSTERING";
-        case BROADCASTING:
-            return "BROADCASTING";
-    }
-
-    return "UnknowMessageModel";
-}
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
deleted file mode 100755
index c9dc304..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ConsumerInvokeCallback.h"
-#include "ResponseFuture.h"
-#include "PullResult.h"
-#include "MQClientAPIImpl.h"
-#include "PullCallback.h"
-#include "MQClientException.h"
-#include "RemotingCommand.h"
-
-namespace rmq
-{
-
-ConsumerInvokeCallback::ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl)
-    : m_pPullCallback(pPullCallback),
-      m_pMQClientAPIImpl(pMQClientAPIImpl)
-{
-}
-
-ConsumerInvokeCallback::~ConsumerInvokeCallback()
-{
-    if (m_pPullCallback != NULL)
-    {
-        delete m_pPullCallback;
-        m_pPullCallback = NULL;
-    }
-}
-
-void ConsumerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
-{
-    if (m_pPullCallback == NULL)
-    {
-        delete this;
-        return;
-    }
-
-    RemotingCommand* response = pResponseFuture->getResponseCommand();
-    if (response != NULL)
-    {
-        try
-        {
-            PullResult* pullResult = m_pMQClientAPIImpl->processPullResponse(response);
-            response->setBody(NULL, 0, false);
-
-            m_pPullCallback->onSuccess(*pullResult);
-
-            pullResult->msgFoundList.clear();
-            delete pullResult;
-        }
-        catch (MQException& e)
-        {
-            m_pPullCallback->onException(e);
-        }
-
-        delete response;
-    }
-    else
-    {
-        if (!pResponseFuture->isSendRequestOK())
-        {
-            std::string msg = "send request failed";
-            MQClientException e(msg, -1, __FILE__, __LINE__);
-            m_pPullCallback->onException(e);
-        }
-        else if (pResponseFuture->isTimeout())
-        {
-            std::string msg = "wait response timeout";
-            MQClientException e(msg, -1, __FILE__, __LINE__);
-            m_pPullCallback->onException(e);
-        }
-        else
-        {
-            std::string msg = "unknow reseaon";
-            MQClientException e(msg, -1, __FILE__, __LINE__);
-            m_pPullCallback->onException(e);
-        }
-    }
-
-    delete this;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
deleted file mode 100755
index 675f2fd..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMER_INVOKECALLBACK_H__
-#define __CONSUMER_INVOKECALLBACK_H__
-
-#include "InvokeCallback.h"
-
-namespace rmq
-{
-	class PullCallback;
-	class MQClientAPIImpl;
-
-	class ConsumerInvokeCallback : public InvokeCallback
-	{
-	public:
-	    ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl);
-	    virtual ~ConsumerInvokeCallback();
-	    virtual void operationComplete(ResponseFuturePtr pResponseFuture);
-
-	private:
-	    PullCallback* m_pPullCallback;
-	    MQClientAPIImpl* m_pMQClientAPIImpl;
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
deleted file mode 100755
index 92cf74c..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __CONSUMERSTAT_H__
-#define __CONSUMERSTAT_H__
-
-#include <list>
-#include <string>
-
-#include "AtomicValue.h"
-#include "KPRUtil.h"
-#include "Mutex.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-    struct ConsumerStat
-    {
-        long long createTimestamp;
-        kpr::AtomicLong consumeMsgRTMax;
-        kpr::AtomicLong consumeMsgRTTotal;
-        kpr::AtomicLong consumeMsgOKTotal;
-        kpr::AtomicLong consumeMsgFailedTotal;
-        kpr::AtomicLong pullRTTotal;
-        kpr::AtomicLong pullTimesTotal;
-
-		ConsumerStat()
-		{
-			createTimestamp = KPRUtil::GetCurrentTimeMillis();
-			consumeMsgRTMax = 0;
-			consumeMsgRTTotal = 0;
-			consumeMsgOKTotal = 0;
-			consumeMsgFailedTotal = 0;
-			pullRTTotal = 0;
-			pullTimesTotal = 0;
-		}
-    };
-
-
-    class ConsumerStatManager
-    {
-    public:
-        ConsumerStat& getConsumertat()
-        {
-            return m_consumertat;
-        }
-
-        std::list<ConsumerStat>& getSnapshotList()
-        {
-            return m_snapshotList;
-        }
-
-        /**
-        * every 1s
-        */
-        void recordSnapshotPeriodically()
-        {
-            kpr::ScopedWLock<kpr::RWMutex> lock(m_snapshotListLock);
-            m_snapshotList.push_back(m_consumertat);
-            if (m_snapshotList.size() > 60)
-            {
-                m_snapshotList.pop_front();
-            }
-        }
-
-        /**
-        * every 1m
-        */
-        void logStatsPeriodically(std::string& group, std::string& clientId)
-        {
-            kpr::ScopedRLock<kpr::RWMutex> lock(m_snapshotListLock);
-            if (m_snapshotList.size() >= 60)
-            {
-                ConsumerStat& first = m_snapshotList.front();
-                ConsumerStat& last = m_snapshotList.back();
-
-                {
-                    double avgRT = (last.consumeMsgRTTotal.get() - first.consumeMsgRTTotal.get())
-                                   /
-                                   (double)((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
-                                            - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()));
-
-                    double tps = ((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
-                                  - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()))
-                                 / (double)(last.createTimestamp - first.createTimestamp);
-
-                    tps *= 1000;
-
-                    RMQ_INFO(
-                        "Consumer, {%s} {%s}, ConsumeAvgRT: {%f} ConsumeMaxRT: {%lld} TotalOKMsg: {%lld} TotalFailedMsg: {%lld} consumeTPS: {%f}",
-                        group.c_str(),
-                        clientId.c_str(),
-                        avgRT,
-                        last.consumeMsgRTMax.get(),
-                        last.consumeMsgOKTotal.get(),
-                        last.consumeMsgFailedTotal.get(),
-                        tps);
-                }
-
-                {
-                    double avgRT = (last.pullRTTotal.get() - first.pullRTTotal.get())
-                                   / (double)(last.pullTimesTotal.get() - first.pullTimesTotal.get());
-
-                    RMQ_INFO("Consumer, {%s} {%s}, PullAvgRT: {%f}  PullTimesTotal: {%lld}",
-                             group.c_str(),
-                             clientId.c_str(),
-                             avgRT,
-                             last.pullTimesTotal.get());
-                }
-            }
-        }
-
-    private:
-        ConsumerStat m_consumertat;
-        std::list<ConsumerStat> m_snapshotList;
-        kpr::RWMutex m_snapshotListLock;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
deleted file mode 100755
index 67a8c8c..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPullConsumer.h"
-
-#include <list>
-#include <string>
-
-#include "MessageQueue.h"
-#include "MessageExt.h"
-#include "ClientConfig.h"
-#include "DefaultMQPullConsumerImpl.h"
-#include "MixAll.h"
-#include "AllocateMessageQueueStrategyInner.h"
-
-namespace rmq
-{
-
-DefaultMQPullConsumer::DefaultMQPullConsumer()
-    : m_consumerGroup(MixAll::DEFAULT_CONSUMER_GROUP),
-      m_brokerSuspendMaxTimeMillis(1000 * 20),
-      m_consumerTimeoutMillisWhenSuspend(1000 * 30),
-      m_consumerPullTimeoutMillis(1000 * 10),
-      m_messageModel(CLUSTERING),
-      m_pMessageQueueListener(NULL),
-      m_pOffsetStore(NULL),
-      m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
-      m_unitMode(false),
-      m_maxReconsumeTimes(16)
-{
-    m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
-}
-
-DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& consumerGroup)
-    : m_consumerGroup(consumerGroup),
-      m_brokerSuspendMaxTimeMillis(1000 * 20),
-      m_consumerTimeoutMillisWhenSuspend(1000 * 30),
-      m_consumerPullTimeoutMillis(1000 * 10),
-      m_messageModel(CLUSTERING),
-      m_pMessageQueueListener(NULL),
-      m_pOffsetStore(NULL),
-      m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
-      m_unitMode(false),
-      m_maxReconsumeTimes(16)
-{
-    m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
-}
-
-DefaultMQPullConsumer::~DefaultMQPullConsumer()
-{
-	//memleak or coredump
-	if (m_pAllocateMessageQueueStrategy)
-		delete m_pAllocateMessageQueueStrategy;
-	if (m_pDefaultMQPullConsumerImpl)
-		delete m_pDefaultMQPullConsumerImpl;
-}
-
-//MQAdmin
-void DefaultMQPullConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
-    m_pDefaultMQPullConsumerImpl->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPullConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
-{
-    return m_pDefaultMQPullConsumerImpl->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQPullConsumer::maxOffset(const MessageQueue& mq)
-{
-    return m_pDefaultMQPullConsumerImpl->maxOffset(mq);
-}
-
-long long DefaultMQPullConsumer::minOffset(const MessageQueue& mq)
-{
-    return m_pDefaultMQPullConsumerImpl->minOffset(mq);
-}
-
-long long DefaultMQPullConsumer::earliestMsgStoreTime(const MessageQueue& mq)
-{
-    return m_pDefaultMQPullConsumerImpl->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQPullConsumer::viewMessage(const std::string& msgId)
-{
-    return m_pDefaultMQPullConsumerImpl->viewMessage(msgId);
-}
-
-QueryResult DefaultMQPullConsumer::queryMessage(const std::string& topic,
-        const std::string&  key,
-        int maxNum,
-        long long begin,
-        long long end)
-{
-    return m_pDefaultMQPullConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
-}
-// MQadmin end
-
-AllocateMessageQueueStrategy* DefaultMQPullConsumer::getAllocateMessageQueueStrategy()
-{
-    return m_pAllocateMessageQueueStrategy;
-}
-
-void DefaultMQPullConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
-    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-int DefaultMQPullConsumer::getBrokerSuspendMaxTimeMillis()
-{
-    return m_brokerSuspendMaxTimeMillis;
-}
-
-void DefaultMQPullConsumer::setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis)
-{
-    m_brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
-}
-
-std::string DefaultMQPullConsumer::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-void DefaultMQPullConsumer::setConsumerGroup(const std::string& consumerGroup)
-{
-    m_consumerGroup = consumerGroup;
-}
-
-int DefaultMQPullConsumer::getConsumerPullTimeoutMillis()
-{
-    return m_consumerPullTimeoutMillis;
-}
-
-void DefaultMQPullConsumer::setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis)
-{
-    m_consumerPullTimeoutMillis = consumerPullTimeoutMillis;
-}
-
-int DefaultMQPullConsumer::getConsumerTimeoutMillisWhenSuspend()
-{
-    return m_consumerTimeoutMillisWhenSuspend;
-}
-
-void DefaultMQPullConsumer::setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend)
-{
-    m_consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
-}
-
-MessageModel DefaultMQPullConsumer::getMessageModel()
-{
-    return m_messageModel;
-}
-
-void DefaultMQPullConsumer::setMessageModel(MessageModel messageModel)
-{
-    m_messageModel = messageModel;
-}
-
-MessageQueueListener* DefaultMQPullConsumer::getMessageQueueListener()
-{
-    return m_pMessageQueueListener;
-}
-
-void DefaultMQPullConsumer::setMessageQueueListener(MessageQueueListener* pMessageQueueListener)
-{
-    m_pMessageQueueListener = pMessageQueueListener;
-}
-
-std::set<std::string> DefaultMQPullConsumer::getRegisterTopics()
-{
-    return m_registerTopics;
-}
-
-void DefaultMQPullConsumer::setRegisterTopics(std::set<std::string> registerTopics)
-{
-    m_registerTopics = registerTopics;
-}
-
-//MQConsumer
-void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
-{
-    m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, "");
-}
-
-void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
-{
-    m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
-}
-
-
-
-std::set<MessageQueue>* DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic)
-{
-    return m_pDefaultMQPullConsumerImpl->fetchSubscribeMessageQueues(topic);
-}
-
-void DefaultMQPullConsumer::start()
-{
-    m_pDefaultMQPullConsumerImpl->start();
-}
-
-void DefaultMQPullConsumer::shutdown()
-{
-    m_pDefaultMQPullConsumerImpl->shutdown();
-}
-//MQConsumer end
-
-//MQPullConsumer
-void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)
-{
-    m_registerTopics.insert(topic);
-
-    if (pListener)
-    {
-        m_pMessageQueueListener = pListener;
-    }
-}
-
-PullResult* DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
-{
-    return m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)
-{
-    m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums, pPullCallback);
-}
-
-PullResult* DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
-{
-    return m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq,
-        const std::string& subExpression,
-        long long offset,
-        int maxNums,
-        PullCallback* pPullCallback)
-{
-    m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums, pPullCallback);
-}
-
-void DefaultMQPullConsumer::updateConsumeOffset(MessageQueue& mq, long long offset)
-{
-    m_pDefaultMQPullConsumerImpl->updateConsumeOffset(mq, offset);
-}
-
-long long DefaultMQPullConsumer::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
-{
-    return m_pDefaultMQPullConsumerImpl->fetchConsumeOffset(mq, fromStore);
-}
-
-std::set<MessageQueue>* DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic)
-{
-    return m_pDefaultMQPullConsumerImpl->fetchMessageQueuesInBalance(topic);
-}
-//MQPullConsumer end
-
-OffsetStore* DefaultMQPullConsumer::getOffsetStore()
-{
-    return m_pOffsetStore;
-}
-
-void DefaultMQPullConsumer::setOffsetStore(OffsetStore* offsetStore)
-{
-    m_pOffsetStore = offsetStore;
-}
-
-DefaultMQPullConsumerImpl* DefaultMQPullConsumer::getDefaultMQPullConsumerImpl()
-{
-    return m_pDefaultMQPullConsumerImpl;
-}
-
-bool DefaultMQPullConsumer::isUnitMode()
-{
-    return m_unitMode;
-}
-
-void DefaultMQPullConsumer::setUnitMode(bool isUnitMode)
-{
-    m_unitMode = isUnitMode;
-}
-
-int DefaultMQPullConsumer::getMaxReconsumeTimes()
-{
-    return m_maxReconsumeTimes;
-}
-
-void DefaultMQPullConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
-{
-    m_maxReconsumeTimes = maxReconsumeTimes;
-}
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
deleted file mode 100755
index d6465e9..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPullConsumerImpl.h"
-
-#include <iostream>
-#include <string>
-#include <set>
-#include "DefaultMQPullConsumer.h"
-#include "DefaultMQProducer.h"
-#include "MQClientFactory.h"
-#include "MQAdminImpl.h"
-#include "RebalancePullImpl.h"
-#include "MQClientAPIImpl.h"
-#include "OffsetStore.h"
-#include "MixAll.h"
-#include "MQClientManager.h"
-#include "LocalFileOffsetStore.h"
-#include "RemoteBrokerOffsetStore.h"
-#include "PullSysFlag.h"
-#include "FilterAPI.h"
-#include "PullAPIWrapper.h"
-#include "MQClientException.h"
-#include "Validators.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-DefaultMQPullConsumerImpl::DefaultMQPullConsumerImpl(DefaultMQPullConsumer* pDefaultMQPullConsumer)
-    : m_pDefaultMQPullConsumer(pDefaultMQPullConsumer),
-      m_serviceState(CREATE_JUST)
-{
-	m_pMQClientFactory = NULL;
-	m_pPullAPIWrapper = NULL;
-    m_pOffsetStore = NULL;
-    m_pRebalanceImpl = new RebalancePullImpl(this);
-}
-
-DefaultMQPullConsumerImpl::~DefaultMQPullConsumerImpl()
-{
-	if (m_pRebalanceImpl)
-		delete m_pRebalanceImpl;
-	if (m_pPullAPIWrapper)
-		delete m_pPullAPIWrapper;
-	if (m_pOffsetStore)
-		delete m_pOffsetStore;
-	//delete m_pMQClientFactory;
-}
-
-void  DefaultMQPullConsumerImpl::start()
-{
-    RMQ_INFO("DefaultMQPullConsumerImpl::start()");
-    switch (m_serviceState)
-    {
-        case CREATE_JUST:
-	        {
-	        	RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}",
-                	m_pDefaultMQPullConsumer->getConsumerGroup().c_str(),
-                	getMessageModelString(m_pDefaultMQPullConsumer->getMessageModel()));
-
-	            m_serviceState = START_FAILED;
-	            checkConfig();
-	            copySubscription();
-
-	            if (m_pDefaultMQPullConsumer->getMessageModel() == CLUSTERING)
-	            {
-	                m_pDefaultMQPullConsumer->changeInstanceNameToPID();
-	            }
-
-	            m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPullConsumer);
-
-	            m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
-	            m_pRebalanceImpl->setMessageModel(m_pDefaultMQPullConsumer->getMessageModel());
-	            m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy());
-	            m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory);
-
-	            m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
-
-	            if (m_pDefaultMQPullConsumer->getOffsetStore() != NULL)
-	            {
-	                m_pOffsetStore = m_pDefaultMQPullConsumer->getOffsetStore();
-	            }
-	            else
-	            {
-	                switch (m_pDefaultMQPullConsumer->getMessageModel())
-	                {
-	                    case BROADCASTING:
-	                        m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
-	                        break;
-	                    case CLUSTERING:
-	                        m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
-	                        break;
-	                    default:
-	                        break;
-	                }
-	            }
-
-	            m_pOffsetStore->load();
-
-	            bool registerOK =
-	                m_pMQClientFactory->registerConsumer(m_pDefaultMQPullConsumer->getConsumerGroup(), this);
-	            if (!registerOK)
-	            {
-	                m_serviceState = CREATE_JUST;
-	                std::string str = "The consumer group[" + m_pDefaultMQPullConsumer->getConsumerGroup();
-	                str += "] has been created before, specify another name please.";
-	                THROW_MQEXCEPTION(MQClientException, str, -1);
-	            }
-
-	            m_pMQClientFactory->start();
-
-	            m_serviceState = RUNNING;
-	        }
-        	break;
-        case RUNNING:
-        case START_FAILED:
-        case SHUTDOWN_ALREADY:
-            THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1);
-        default:
-            break;
-    }
-}
-
-
-void  DefaultMQPullConsumerImpl::shutdown()
-{
-    RMQ_DEBUG("DefaultMQPullConsumerImpl::shutdown()");
-    switch (m_serviceState)
-    {
-        case CREATE_JUST:
-            break;
-        case RUNNING:
-            persistConsumerOffset();
-            m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPullConsumer->getConsumerGroup());
-            m_pMQClientFactory->shutdown();
-
-            m_serviceState = SHUTDOWN_ALREADY;
-            break;
-        case SHUTDOWN_ALREADY:
-            break;
-        default:
-            break;
-    }
-}
-
-
-void DefaultMQPullConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
-    makeSureStateOK();
-    m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPullConsumerImpl::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
-{
-    makeSureStateOK();
-    return m_pOffsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE);
-}
-
-std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchMessageQueuesInBalance(const std::string& topic)
-{
-    makeSureStateOK();
-    std::set<MessageQueue>* mqResult = new std::set<MessageQueue>;
-
-	kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
-    std::map<MessageQueue, ProcessQueue*>& mqTable = m_pRebalanceImpl->getProcessQueueTable();
-    RMQ_FOR_EACH(mqTable, it)
-    {
-        if (it->first.getTopic() == topic)
-        {
-            mqResult->insert(it->first);
-        }
-    }
-
-    return mqResult;
-}
-
-std::vector<MessageQueue>* DefaultMQPullConsumerImpl::fetchPublishMessageQueues(const std::string&  topic)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
-}
-
-std::set<MessageQueue>*  DefaultMQPullConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->fetchSubscribeMessageQueues(topic);
-}
-
-long long  DefaultMQPullConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
-}
-
-std::string  DefaultMQPullConsumerImpl::groupName()
-{
-    return m_pDefaultMQPullConsumer->getConsumerGroup();
-}
-
-MessageModel  DefaultMQPullConsumerImpl::messageModel()
-{
-    return m_pDefaultMQPullConsumer->getMessageModel();
-}
-
-ConsumeType  DefaultMQPullConsumerImpl::consumeType()
-{
-    return CONSUME_ACTIVELY;
-}
-
-ConsumeFromWhere  DefaultMQPullConsumerImpl::consumeFromWhere()
-{
-    return CONSUME_FROM_LAST_OFFSET;
-}
-
-std::set<SubscriptionData>  DefaultMQPullConsumerImpl::subscriptions()
-{
-    //TODO
-    std::set<SubscriptionData> result;
-    return result;
-}
-
-void DefaultMQPullConsumerImpl::doRebalance()
-{
-    if (m_pRebalanceImpl != NULL)
-    {
-        m_pRebalanceImpl->doRebalance();
-    }
-}
-
-void  DefaultMQPullConsumerImpl::persistConsumerOffset()
-{
-    try
-    {
-        makeSureStateOK();
-
-        std::set<MessageQueue> mqs;
-		{
-	        kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
-	        std::map<MessageQueue, ProcessQueue*> processQueueTable = m_pRebalanceImpl->getProcessQueueTable();
-	        RMQ_FOR_EACH(processQueueTable, it)
-	        {
-	            mqs.insert(it->first);
-	        }
-        }
-
-        m_pOffsetStore->persistAll(mqs);
-    }
-    catch (...)
-    {
-        RMQ_ERROR("group {%s} persistConsumerOffset exception",
-                  m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
-    }
-}
-
-void  DefaultMQPullConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info)
-{
-    std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
-
-    if (subTable.find(topic) != subTable.end())
-    {
-        m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info));
-    }
-}
-
-bool  DefaultMQPullConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic)
-{
-    std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
-    if (subTable.find(topic) != subTable.end())
-    {
-        std::map<std::string, std::set<MessageQueue> >& mqs =
-            m_pRebalanceImpl->getTopicSubscribeInfoTable();
-        return mqs.find(topic) == mqs.end();
-    }
-
-    return false;
-}
-
-long long  DefaultMQPullConsumerImpl::maxOffset(const MessageQueue& mq)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-}
-
-long long  DefaultMQPullConsumerImpl::minOffset(const MessageQueue& mq)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
-}
-
-PullResult* DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
-        const std::string& subExpression,
-        long long offset,
-        int maxNums)
-{
-    return pullSyncImpl(mq, subExpression, offset, maxNums, false);
-}
-
-void  DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
-                                      const std::string& subExpression,
-                                      long long offset,
-                                      int maxNums,
-                                      PullCallback* pPullCallback)
-{
-    pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, false);
-}
-
-PullResult* DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
-        const std::string& subExpression,
-        long long offset,
-        int maxNums)
-{
-    return pullSyncImpl(mq, subExpression, offset, maxNums, true);
-}
-
-void  DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
-        const std::string& subExpression,
-        long long offset,
-        int maxNums,
-        PullCallback* pPullCallback)
-{
-    pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, true);
-}
-
-QueryResult  DefaultMQPullConsumerImpl::queryMessage(const std::string& topic,
-        const std::string&  key,
-        int maxNum,
-        long long begin,
-        long long end)
-{
-    makeSureStateOK();
-
-    QueryResult result(0, std::list<MessageExt*>());
-    return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end);
-}
-
-long long DefaultMQPullConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp)
-{
-    makeSureStateOK();
-    return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
-}
-
-void  DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
-{
-	return sendMessageBack(msg, delayLevel, brokerName, m_pDefaultMQPullConsumer->getConsumerGroup());
-}
-
-
-void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName,
-	const std::string& consumerGroup)
-{
-    try
-    {
-    	std::string brokerAddr = brokerName.empty() ?
-    		socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName);
-
-        m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg,
-                consumerGroup.empty() ? m_pDefaultMQPullConsumer->getConsumerGroup() : consumerGroup,
-                delayLevel,
-                3000);
-    }
-    catch (...)
-    {
-        RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
-        Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPullConsumer->getConsumerGroup()),
-                       msg.getBody(), msg.getBodyLen());
-
-		std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
-		newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
-                    : originMsgId);
-
-        newMsg.setFlag(msg.getFlag());
-        newMsg.setProperties(msg.getProperties());
-        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
-
-        int reTimes = msg.getReconsumeTimes() + 1;
-        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
-        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPullConsumer->getMaxReconsumeTimes()));
-        newMsg.setDelayTimeLevel(3 + reTimes);
-
-        m_pMQClientFactory->getDefaultMQProducer()->send(newMsg);
-    }
-}
-
-void  DefaultMQPullConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset)
-{
-    makeSureStateOK();
-    m_pOffsetStore->updateOffset(mq, offset, false);
-}
-
-MessageExt*  DefaultMQPullConsumerImpl::viewMessage(const std::string& msgId)
-{
-    makeSureStateOK();
-
-    return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
-}
-
-DefaultMQPullConsumer*  DefaultMQPullConsumerImpl::getDefaultMQPullConsumer()
-{
-    return m_pDefaultMQPullConsumer;
-}
-
-OffsetStore*  DefaultMQPullConsumerImpl::getOffsetStore()
-{
-    return m_pOffsetStore;
-}
-
-void  DefaultMQPullConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore)
-{
-    m_pOffsetStore = pOffsetStore;
-}
-
-void  DefaultMQPullConsumerImpl::makeSureStateOK()
-{
-    if (m_serviceState != RUNNING)
-    {
-        THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1);
-    }
-}
-
-PullResult* DefaultMQPullConsumerImpl::pullSyncImpl(MessageQueue& mq,
-        const std::string& subExpression,
-        long long offset,
-        int maxNums,
-        bool block)
-{
-    makeSureStateOK();
-
-    if (offset < 0)
-    {
-        THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
-    }
-
-    if (maxNums <= 0)
-    {
-        THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
-    }
-
-    subscriptionAutomatically(mq.getTopic());
-
-    int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
-
-    SubscriptionDataPtr subscriptionData = NULL;
-    try
-    {
-        subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
-    }
-    catch (...)
-    {
-        THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
-    }
-
-    int timeoutMillis =
-        block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
-        : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
-
-    PullResult* pullResult = m_pPullAPIWrapper->pullKernelImpl(//
-                                 mq, // 1
-                                 subscriptionData->getSubString(), // 2
-                                 0L, // 3
-                                 offset, // 4
-                                 maxNums, // 5
-                                 sysFlag, // 6
-                                 0, // 7
-                                 m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
-                                 timeoutMillis, // 9
-                                 SYNC, // 10
-                                 NULL// 11
-                             );
-
-    return m_pPullAPIWrapper->processPullResult(mq, *pullResult, *subscriptionData);
-}
-
-void  DefaultMQPullConsumerImpl::subscriptionAutomatically(const std::string& topic)
-{
-    std::map<std::string, SubscriptionData>& sd = m_pRebalanceImpl->getSubscriptionInner();
-    std::map<std::string, SubscriptionData>::iterator it = sd.find(topic);
-
-    if (it == sd.end())
-    {
-        try
-        {
-            SubscriptionDataPtr subscriptionData =
-                FilterAPI::buildSubscriptionData(topic, SubscriptionData::SUB_ALL);
-            sd[topic] = *subscriptionData;
-        }
-        catch (...)
-        {
-        	RMQ_WARN("FilterAPI::buildSubscriptionData exception");
-        }
-    }
-}
-
-void DefaultMQPullConsumerImpl::pullAsyncImpl(//
-    MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums,
-    PullCallback* pPullCallback,//
-    bool block)
-{
-    makeSureStateOK();
-
-    if (offset < 0)
-    {
-        THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
-    }
-
-    if (maxNums <= 0)
-    {
-        THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
-    }
-
-    if (pPullCallback == NULL)
-    {
-        THROW_MQEXCEPTION(MQClientException, "pullCallback is null", -1);
-    }
-
-    subscriptionAutomatically(mq.getTopic());
-    try
-    {
-        int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
-
-        SubscriptionDataPtr subscriptionData = NULL;
-        try
-        {
-            subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
-        }
-        catch (...)
-        {
-            THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
-        }
-
-        int timeoutMillis =
-            block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
-            : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
-        DefaultMQPullConsumerImplCallback* callback =
-            new DefaultMQPullConsumerImplCallback(*subscriptionData,
-                    mq, this, pPullCallback);
-
-        m_pPullAPIWrapper->pullKernelImpl(
-			mq, // 1
-			subscriptionData->getSubString(), // 2
-			0L, // 3
-			offset, // 4
-			maxNums, // 5
-			sysFlag, // 6
-			0, // 7
-			m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
-			timeoutMillis, // 9
-			ASYNC, // 10
-			callback// 11
-		);
-    }
-    catch (const MQBrokerException& e)
-    {
-        THROW_MQEXCEPTION(MQClientException, "pullAsync unknow exception", -1);
-    }
-}
-
-
-void  DefaultMQPullConsumerImpl::copySubscription()
-{
-    try
-    {
-        std::set<std::string> registerTopics = m_pDefaultMQPullConsumer->getRegisterTopics();
-        std::set<std::string>::iterator it = registerTopics.begin();
-
-        for (; it != registerTopics.end(); it++)
-        {
-            SubscriptionDataPtr subscriptionData =
-                FilterAPI::buildSubscriptionData(*it, SubscriptionData::SUB_ALL);
-            m_pRebalanceImpl->getSubscriptionInner()[*it] = *subscriptionData;
-        }
-    }
-    catch (...)
-    {
-        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
-    }
-}
-
-
-void  DefaultMQPullConsumerImpl::checkConfig()
-{
-    // check consumerGroup
-    Validators::checkGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
-
-    // consumerGroup
-    if (m_pDefaultMQPullConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal "
-                          + MixAll::DEFAULT_CONSUMER_GROUP //
-                          + ", please specify another one.", -1);
-    }
-
-    if (m_pDefaultMQPullConsumer->getMessageModel() != BROADCASTING
-        && m_pDefaultMQPullConsumer->getMessageModel() != CLUSTERING)
-    {
-        THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
-    }
-
-    // allocateMessageQueueStrategy
-    if (m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy() == NULL)
-    {
-        THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
-    }
-}
-
-ServiceState DefaultMQPullConsumerImpl::getServiceState()
-{
-    return m_serviceState;
-}
-
-void DefaultMQPullConsumerImpl::setServiceState(ServiceState serviceState)
-{
-    m_serviceState = serviceState;
-}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
deleted file mode 100755
index 171565c..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __DEFAULTMQPULLCONSUMERIMPL_H__
-#define __DEFAULTMQPULLCONSUMERIMPL_H__
-
-#include <string>
-#include <set>
-#include <map>
-#include <vector>
-#include "MQConsumerInner.h"
-#include "MessageExt.h"
-#include "QueryResult.h"
-#include "ServiceState.h"
-#include "PullRequest.h"
-#include "MessageQueue.h"
-#include "PullResult.h"
-#include "PullCallback.h"
-#include "PullAPIWrapper.h"
-
-namespace rmq
-{
-class DefaultMQPullConsumer;
-class PullCallback;
-class OffsetStore;
-class RebalanceImpl;
-class MQClientFactory;
-class PullAPIWrapper;
-
-/**
-* PullConsumer imp
-*/
-class DefaultMQPullConsumerImpl : public  MQConsumerInner
-{
-public:
-    DefaultMQPullConsumerImpl(DefaultMQPullConsumer *pDefaultMQPullConsumer);
-    ~DefaultMQPullConsumerImpl();
-    void createTopic(const std::string &key, const std::string &newTopic,
-                     int queueNum);
-    long long fetchConsumeOffset(MessageQueue &mq, bool fromStore);
-    std::set<MessageQueue> *fetchMessageQueuesInBalance(const std::string &topic);
-    std::vector<MessageQueue> *fetchPublishMessageQueues(const std::string  &topic);
-    std::set<MessageQueue> *fetchSubscribeMessageQueues(const std::string &topic);
-    long long earliestMsgStoreTime(const MessageQueue &mq);
-    std::string groupName();
-    MessageModel messageModel();
-    ConsumeType consumeType();
-    ConsumeFromWhere consumeFromWhere();
-    std::set<SubscriptionData> subscriptions();
-    void doRebalance();
-    void persistConsumerOffset();
-    void updateTopicSubscribeInfo(const std::string &topic,
-                                  const std::set<MessageQueue> &info);
-    bool isSubscribeTopicNeedUpdate(const std::string &topic);
-    long long maxOffset(const MessageQueue &mq);
-    long long minOffset(const MessageQueue &mq);
-
-    PullResult *pull(MessageQueue &mq,
-                     const std::string &subExpression,
-                     long long offset,
-                     int maxNums);
-
-    void pull(MessageQueue &mq,
-              const std::string &subExpression,
-              long long offset,
-              int maxNums,
-              PullCallback *pPullCallback);
-
-    PullResult *pullBlockIfNotFound(MessageQueue &mq,
-                                    const std::string &subExpression,
-                                    long long offset, int maxNums);
-
-    void pullBlockIfNotFound(MessageQueue &mq,
-                             const std::string &subExpression,
-                             long long offset, int maxNums,
-                             PullCallback *pPullCallback);
-
-    QueryResult queryMessage(const std::string &topic,
-                             const std::string  &key,
-                             int maxNum,
-                             long long begin,
-                             long long end);
-
-    long long searchOffset(const MessageQueue &mq, long long timestamp);
-    void sendMessageBack(MessageExt &msg, int delayLevel,
-                         const std::string &brokerName);
-    void sendMessageBack(MessageExt &msg, int delayLevel,
-                         const std::string &brokerName, const std::string &consumerGroup);
-    void shutdown();
-    void updateConsumeOffset(MessageQueue &mq, long long offset);
-    MessageExt *viewMessage(const std::string &msgId);
-    DefaultMQPullConsumer *getDefaultMQPullConsumer();
-    OffsetStore *getOffsetStore();
-    void setOffsetStore(OffsetStore *pOffsetStore);
-    void start();
-
-    ServiceState getServiceState();
-    void setServiceState(ServiceState serviceState);
-
-private:
-    void makeSureStateOK();
-    void subscriptionAutomatically(const std::string &topic);
-    void copySubscription();
-    void checkConfig();
-
-    PullResult *pullSyncImpl(MessageQueue &mq,
-                             const std::string &subExpression,
-                             long long offset,
-                             int maxNums,
-                             bool block) ;
-    void pullAsyncImpl(MessageQueue &mq,
-                       const std::string &subExpression,
-                       long long offset,
-                       int maxNums,
-                       PullCallback *pPullCallback,
-                       bool block);
-
-private:
-    DefaultMQPullConsumer *m_pDefaultMQPullConsumer;
-    ServiceState m_serviceState;
-    MQClientFactory *m_pMQClientFactory;
-    PullAPIWrapper *m_pPullAPIWrapper;
-    OffsetStore *m_pOffsetStore;
-    RebalanceImpl *m_pRebalanceImpl;
-    friend class DefaultMQPullConsumerImplCallback;
-};
-
-class DefaultMQPullConsumerImplCallback : public PullCallback
-{
-public:
-    DefaultMQPullConsumerImplCallback(SubscriptionData &subscriptionData,
-                                      MessageQueue &mq,
-                                      DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl,
-                                      PullCallback *pCallback)
-        : m_subscriptionData(subscriptionData),
-          m_mq(mq),
-          m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl),
-          m_pCallback(pCallback)
-    {
-    }
-
-    void onSuccess(PullResult &pullResult)
-    {
-        m_pCallback->onSuccess(
-            *m_pDefaultMQPullConsumerImpl->m_pPullAPIWrapper->
-            processPullResult(m_mq, pullResult, m_subscriptionData));
-    }
-
-    void onException(MQException &e)
-    {
-        m_pCallback->onException(e);
-    }
-
-private:
-    SubscriptionData m_subscriptionData;
-    MessageQueue m_mq;
-    DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
-    PullCallback *m_pCallback;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
deleted file mode 100755
index 45ee907..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPushConsumer.h"
-#include <list>
-#include <string>
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "MessageQueue.h"
-#include "MessageExt.h"
-#include "ClientConfig.h"
-#include "ConsumerStatManage.h"
-#include "MixAll.h"
-#include "AllocateMessageQueueStrategyInner.h"
-
-namespace rmq
-{
-
-class AllocateMessageQueueStrategy;
-
-DefaultMQPushConsumer::DefaultMQPushConsumer()
-{
-    m_consumerGroup = MixAll::DEFAULT_CONSUMER_GROUP;
-    m_messageModel = CLUSTERING;
-    m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
-    m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
-    m_pMessageListener = NULL;
-    m_consumeThreadMin = 5;
-    m_consumeThreadMax = 25;
-    m_consumeConcurrentlyMaxSpan = 2000;
-    m_pullThresholdForQueue = 1000;
-    m_pullInterval = 0;
-    m_consumeMessageBatchMaxSize = 1;
-    m_pullBatchSize = 32;
-    m_postSubscriptionWhenPull = false;
-    m_unitMode = false;
-    m_maxReconsumeTimes = 16;
-    m_suspendCurrentQueueTimeMillis = 1000;
-    m_consumeTimeout = 15;
-    m_pOffsetStore = NULL;
-    m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
-}
-
-DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& consumerGroup)
-{
-    m_consumerGroup = consumerGroup;
-    m_messageModel = CLUSTERING;
-    m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
-    m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
-    m_pMessageListener = NULL;
-    m_consumeThreadMin = 5;
-    m_consumeThreadMax = 25;
-    m_consumeConcurrentlyMaxSpan = 2000;
-    m_pullThresholdForQueue = 1000;
-    m_pullInterval = 0;
-    m_consumeMessageBatchMaxSize = 1;
-    m_pullBatchSize = 32;
-    m_postSubscriptionWhenPull = false;
-    m_unitMode = false;
-    m_maxReconsumeTimes = 16;
-    m_suspendCurrentQueueTimeMillis = 1000;
-    m_consumeTimeout = 15;
-    m_pOffsetStore = NULL;
-    m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
-}
-
-DefaultMQPushConsumer::~DefaultMQPushConsumer()
-{
-    delete m_pAllocateMessageQueueStrategy;
-    delete m_pDefaultMQPushConsumerImpl;
-}
-
-//MQAdmin
-void DefaultMQPushConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
-    m_pDefaultMQPushConsumerImpl->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPushConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
-{
-    return m_pDefaultMQPushConsumerImpl->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQPushConsumer::maxOffset(const MessageQueue& mq)
-{
-    return m_pDefaultMQPushConsumerImpl->maxOffset(mq);
-}
-
-long long DefaultMQPushConsumer::minOffset(const MessageQueue& mq)
-{
-    return m_pDefaultMQPushConsumerImpl->minOffset(mq);
-}
-
-long long DefaultMQPushConsumer::earliestMsgStoreTime(const MessageQueue& mq)
-{
-    return m_pDefaultMQPushConsumerImpl->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQPushConsumer::viewMessage(const std::string& msgId)
-{
-    return m_pDefaultMQPushConsumerImpl->viewMessage(msgId);
-}
-
-QueryResult DefaultMQPushConsumer::queryMessage(const std::string& topic,
-        const std::string&  key,
-        int maxNum,
-        long long begin,
-        long long end)
-{
-    return m_pDefaultMQPushConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
-}
-// MQadmin end
-
-AllocateMessageQueueStrategy* DefaultMQPushConsumer::getAllocateMessageQueueStrategy()
-{
-    return m_pAllocateMessageQueueStrategy;
-}
-
-void DefaultMQPushConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
-    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-int DefaultMQPushConsumer::getConsumeConcurrentlyMaxSpan()
-{
-    return m_consumeConcurrentlyMaxSpan;
-}
-
-void DefaultMQPushConsumer::setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan)
-{
-    m_consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
-}
-
-ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere()
-{
-    return m_consumeFromWhere;
-}
-
-void DefaultMQPushConsumer::setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)
-{
-    m_consumeFromWhere = consumeFromWhere;
-}
-
-int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize()
-{
-    return m_consumeMessageBatchMaxSize;
-}
-
-void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize)
-{
-    m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
-}
-
-std::string DefaultMQPushConsumer::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-void DefaultMQPushConsumer::setConsumerGroup(const std::string& consumerGroup)
-{
-    m_consumerGroup = consumerGroup;
-}
-
-int DefaultMQPushConsumer::getConsumeThreadMax()
-{
-    return m_consumeThreadMax;
-}
-
-void DefaultMQPushConsumer::setConsumeThreadMax(int consumeThreadMax)
-{
-    m_consumeThreadMax = consumeThreadMax;
-}
-
-int DefaultMQPushConsumer::getConsumeThreadMin()
-{
-    return m_consumeThreadMin;
-}
-
-void DefaultMQPushConsumer::setConsumeThreadMin(int consumeThreadMin)
-{
-    m_consumeThreadMin = consumeThreadMin;
-}
-
-DefaultMQPushConsumerImpl* DefaultMQPushConsumer::getDefaultMQPushConsumerImpl()
-{
-    return m_pDefaultMQPushConsumerImpl;
-}
-
-MessageListener* DefaultMQPushConsumer::getMessageListener()
-{
-    return m_pMessageListener;
-}
-
-void DefaultMQPushConsumer::setMessageListener(MessageListener* pMessageListener)
-{
-    m_pMessageListener = pMessageListener;
-}
-
-MessageModel DefaultMQPushConsumer::getMessageModel()
-{
-    return m_messageModel;
-}
-
-void DefaultMQPushConsumer::setMessageModel(MessageModel messageModel)
-{
-    m_messageModel = messageModel;
-}
-
-int DefaultMQPushConsumer::getPullBatchSize()
-{
-    return m_pullBatchSize;
-}
-
-void DefaultMQPushConsumer::setPullBatchSize(int pullBatchSize)
-{
-    m_pullBatchSize = pullBatchSize;
-}
-
-long DefaultMQPushConsumer::getPullInterval()
-{
-    return m_pullInterval;
-}
-
-void DefaultMQPushConsumer::setPullInterval(long pullInterval)
-{
-    m_pullInterval = pullInterval;
-}
-
-int DefaultMQPushConsumer::getPullThresholdForQueue()
-{
-    return m_pullThresholdForQueue;
-}
-
-void DefaultMQPushConsumer::setPullThresholdForQueue(int pullThresholdForQueue)
-{
-    m_pullThresholdForQueue = pullThresholdForQueue;
-}
-
-std::map<std::string, std::string>& DefaultMQPushConsumer::getSubscription()
-{
-    return m_subscription;
-}
-
-void DefaultMQPushConsumer::setSubscription(const std::map<std::string, std::string>& subscription)
-{
-    m_subscription = subscription;
-}
-
-//MQConsumer
-void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
-{
-    m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, "");
-}
-
-void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName)
-{
-    m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
-}
-
-
-std::set<MessageQueue>* DefaultMQPushConsumer::fetchSubscribeMessageQueues(const std::string& topic)
-{
-    return m_pDefaultMQPushConsumerImpl->fetchSubscribeMessageQueues(topic);
-}
-
-void DefaultMQPushConsumer::start()
-{
-    m_pDefaultMQPushConsumerImpl->start();
-}
-
-void DefaultMQPushConsumer::shutdown()
-{
-    m_pDefaultMQPushConsumerImpl->shutdown();
-}
-//MQConsumer end
-
-//MQPushConsumer
-void DefaultMQPushConsumer::registerMessageListener(MessageListener* pMessageListener)
-{
-    m_pMessageListener = pMessageListener;
-    m_pDefaultMQPushConsumerImpl->registerMessageListener(pMessageListener);
-}
-
-void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& subExpression)
-{
-    m_pDefaultMQPushConsumerImpl->subscribe(topic, subExpression);
-}
-
-void DefaultMQPushConsumer::unsubscribe(const std::string& topic)
-{
-    m_pDefaultMQPushConsumerImpl->unsubscribe(topic);
-}
-
-void DefaultMQPushConsumer::updateCorePoolSize(int corePoolSize)
-{
-    m_pDefaultMQPushConsumerImpl->updateCorePoolSize(corePoolSize);
-}
-
-void DefaultMQPushConsumer::suspend()
-{
-    m_pDefaultMQPushConsumerImpl->suspend();
-}
-
-void DefaultMQPushConsumer::resume()
-{
-    m_pDefaultMQPushConsumerImpl->resume();
-}
-//MQPushConsumer end
-
-OffsetStore* DefaultMQPushConsumer::getOffsetStore()
-{
-    return m_pOffsetStore;
-}
-
-void DefaultMQPushConsumer::setOffsetStore(OffsetStore* pOffsetStore)
-{
-    m_pOffsetStore = pOffsetStore;
-}
-
-std::string DefaultMQPushConsumer::getConsumeTimestamp() {
-    return m_consumeTimestamp;
-}
-
-void DefaultMQPushConsumer::setConsumeTimestamp(std::string consumeTimestamp) {
-    m_consumeTimestamp = consumeTimestamp;
-}
-
-bool DefaultMQPushConsumer::isPostSubscriptionWhenPull()
-{
-    return m_postSubscriptionWhenPull;
-}
-
-
-void DefaultMQPushConsumer::setPostSubscriptionWhenPull(bool postSubscriptionWhenPull)
-{
-    m_postSubscriptionWhenPull = postSubscriptionWhenPull;
-}
-
-
-bool DefaultMQPushConsumer::isUnitMode()
-{
-    return m_unitMode;
-}
-
-
-void DefaultMQPushConsumer::setUnitMode(bool isUnitMode)
-{
-    m_unitMode = isUnitMode;
-}
-
-int DefaultMQPushConsumer::getMaxReconsumeTimes()
-{
-    return m_maxReconsumeTimes;
-}
-
-
-void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
-{
-    m_maxReconsumeTimes = maxReconsumeTimes;
-}
-
-
-int DefaultMQPushConsumer::getSuspendCurrentQueueTimeMillis()
-{
-    return m_suspendCurrentQueueTimeMillis;
-}
-
-
-void DefaultMQPushConsumer::setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis)
-{
-    m_suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
-}
-
-
-int DefaultMQPushConsumer::getConsumeTimeout()
-{
-    return m_consumeTimeout;
-}
-
-void DefaultMQPushConsumer::setConsumeTimeout(int consumeTimeout)
-{
-    m_consumeTimeout = consumeTimeout;
-}
-
-
-}