You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:46 UTC
[12/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
deleted file mode 100755
index c7d9695..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
+++ /dev/null
@@ -1,574 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include <list>
-#include <string>
-
-#include "ConsumeMessageOrderlyService.h"
-#include "DefaultMQPushConsumerImpl.h"
-#include "MQClientFactory.h"
-#include "DefaultMQProducer.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include "RebalanceImpl.h"
-#include "DefaultMQPushConsumer.h"
-#include "OffsetStore.h"
-#include "ScopedLock.h"
-#include "KPRUtil.h"
-#include "MixAll.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-class LockMq : public kpr::TimerHandler
-{
-public:
- LockMq(ConsumeMessageOrderlyService* pService)
- : m_pService(pService)
- {
-
- }
-
- void OnTimeOut(unsigned int timerID)
- {
- m_pService->lockMQPeriodically();
-
- // can not delete
- //delete this;
- }
-
-private:
- ConsumeMessageOrderlyService* m_pService;
-};
-
-class SubmitConsumeRequestLaterOrderly : public kpr::TimerHandler
-{
-public:
- SubmitConsumeRequestLaterOrderly(ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- ConsumeMessageOrderlyService* pService)
- : m_pProcessQueue(pProcessQueue),
- m_messageQueue(messageQueue),
- m_pService(pService)
- {
-
- }
-
- void OnTimeOut(unsigned int timerID)
- {
- try
- {
- std::list<MessageExt*> msgs;
- m_pService->submitConsumeRequest(msgs, m_pProcessQueue, m_messageQueue, true);
- }
- catch(...)
- {
- RMQ_ERROR("SubmitConsumeRequestLaterOrderly OnTimeOut exception");
- }
-
- delete this;
- }
-
-private:
- ProcessQueue* m_pProcessQueue;
- MessageQueue m_messageQueue;
- ConsumeMessageOrderlyService* m_pService;
-};
-
-
-class TryLockLaterAndReconsume : public kpr::TimerHandler
-{
-public:
- TryLockLaterAndReconsume(ProcessQueue* pProcessQueue,
- MessageQueue& messageQueue,
- ConsumeMessageOrderlyService* pService)
- : m_pProcessQueue(pProcessQueue),
- m_messageQueue(messageQueue),
- m_pService(pService)
- {
-
- }
-
- void OnTimeOut(unsigned int timerID)
- {
- try
- {
- bool lockOK = m_pService->lockOneMQ(m_messageQueue);
- if (lockOK)
- {
- m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
- }
- else
- {
- m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 3000);
- }
- }
- catch(...)
- {
- RMQ_ERROR("TryLockLaterAndReconsume OnTimeOut exception");
- }
-
- delete this;
- }
-
-private:
- ProcessQueue* m_pProcessQueue;
- MessageQueue m_messageQueue;
- ConsumeMessageOrderlyService* m_pService;
-};
-
-
-
-ConsumeMessageOrderlyService::ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
- MessageListenerOrderly* pMessageListener)
-{
- m_stoped = false;
- m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
- m_pMessageListener = pMessageListener;
- m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
- m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
- m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 1,
- m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
- m_scheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 10);
-}
-
-ConsumeMessageOrderlyService::~ConsumeMessageOrderlyService()
-{
-}
-
-
-void ConsumeMessageOrderlyService::start()
-{
- m_scheduledExecutorService->Start();
-
- LockMq* lm = new LockMq(this);
- m_scheduledExecutorService->RegisterTimer(0, ProcessQueue::s_RebalanceLockInterval, lm, true);
-}
-
-void ConsumeMessageOrderlyService::shutdown()
-{
- m_stoped = true;
- m_pConsumeExecutor->Destroy();
- m_scheduledExecutorService->Stop();
- m_scheduledExecutorService->Join();
- unlockAllMQ();
-}
-
-void ConsumeMessageOrderlyService::unlockAllMQ()
-{
- m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->unlockAll(false);
-}
-
-void ConsumeMessageOrderlyService::lockMQPeriodically()
-{
- if (!m_stoped)
- {
- m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lockAll();
- }
-}
-
-bool ConsumeMessageOrderlyService::lockOneMQ(MessageQueue& mq)
-{
- if (!m_stoped)
- {
- return m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lock(mq);
- }
-
- return false;
-}
-
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(MessageQueue& messageQueue,
- ProcessQueue* pProcessQueue,
- long long delayMills)
-{
- TryLockLaterAndReconsume* consume = new TryLockLaterAndReconsume(pProcessQueue, messageQueue, this);
- m_scheduledExecutorService->RegisterTimer(0, int(delayMills), consume, false);
-}
-
-ConsumerStat& ConsumeMessageOrderlyService::getConsumerStat()
-{
- return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
-}
-
-void ConsumeMessageOrderlyService::submitConsumeRequestLater(ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- long long suspendTimeMillis)
-{
- long timeMillis = long(suspendTimeMillis);
- if (timeMillis < 10)
- {
- timeMillis = 10;
- }
- else if (timeMillis > 30000)
- {
- timeMillis = 30000;
- }
-
- SubmitConsumeRequestLaterOrderly* sc = new SubmitConsumeRequestLaterOrderly(pProcessQueue, messageQueue, this);
- m_scheduledExecutorService->RegisterTimer(0, timeMillis, sc, false);
-}
-
-void ConsumeMessageOrderlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- bool dispathToConsume)
-{
- if (dispathToConsume)
- {
- kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeOrderlyRequest(pProcessQueue, messageQueue, this);
- m_pConsumeExecutor->AddWork(consumeRequest);
- }
-}
-
-void ConsumeMessageOrderlyService::updateCorePoolSize(int corePoolSize)
-{
-}
-
-
-std::string& ConsumeMessageOrderlyService::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-MessageListenerOrderly* ConsumeMessageOrderlyService::getMessageListener()
-{
- return m_pMessageListener;
-}
-
-DefaultMQPushConsumerImpl* ConsumeMessageOrderlyService::getDefaultMQPushConsumerImpl()
-{
- return m_pDefaultMQPushConsumerImpl;
-}
-
-bool ConsumeMessageOrderlyService::processConsumeResult(std::list<MessageExt*>& msgs,
- ConsumeOrderlyStatus status,
- ConsumeOrderlyContext& context,
- ConsumeOrderlyRequest& consumeRequest)
-{
- bool continueConsume = true;
- long long commitOffset = -1L;
- int msgsSize = msgs.size();
-
- if (context.autoCommit)
- {
- switch (status)
- {
- case COMMIT:
- case ROLLBACK:
- RMQ_WARN("the message queue consume result is illegal, we think you want to ack these message: %s",
- consumeRequest.getMessageQueue().toString().c_str());
- case SUCCESS:
- getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
- commitOffset = consumeRequest.getProcessQueue()->commit();
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
- if (checkReconsumeTimes(msgs))
- {
- consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
- submitConsumeRequestLater(consumeRequest.getProcessQueue(),
- consumeRequest.getMessageQueue(),
- context.suspendCurrentQueueTimeMillis);
- continueConsume = false;
- }
- else
- {
- commitOffset = consumeRequest.getProcessQueue()->commit();
- }
-
- break;
- default:
- break;
- }
- }
- else
- {
- switch (status)
- {
- case SUCCESS:
- getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
- break;
- case COMMIT:
- commitOffset = consumeRequest.getProcessQueue()->commit();
- break;
- case ROLLBACK:
- consumeRequest.getProcessQueue()->rollback();
- submitConsumeRequestLater(consumeRequest.getProcessQueue(),
- consumeRequest.getMessageQueue(),
- context.suspendCurrentQueueTimeMillis);
- continueConsume = false;
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
- if (checkReconsumeTimes(msgs))
- {
- consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
- submitConsumeRequestLater(consumeRequest.getProcessQueue(),
- consumeRequest.getMessageQueue(),
- context.suspendCurrentQueueTimeMillis);
- continueConsume = false;
- }
- break;
- default:
- break;
- }
- }
-
- if (commitOffset >= 0 && !consumeRequest.getProcessQueue()->isDropped())
- {
- m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
- commitOffset, false);
- }
-
- return continueConsume;
-}
-
-bool ConsumeMessageOrderlyService::checkReconsumeTimes(std::list<MessageExt*>& msgs)
-{
- bool suspend = false;
-
- if (!msgs.empty())
- {
- std::list<MessageExt*>::iterator it = msgs.begin();
- for (; it != msgs.end(); it++)
- {
- MessageExt* msg = *it;
- if (msg->getReconsumeTimes() >= m_pDefaultMQPushConsumer->getMaxReconsumeTimes())
- {
- msg->putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(msg->getReconsumeTimes()));
-
- if (!sendMessageBack(*msg))
- {
- suspend = true;
- msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
- }
- }
- else
- {
- suspend = true;
- msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
- }
- }
- }
-
- return suspend;
-}
-
-bool ConsumeMessageOrderlyService::sendMessageBack(MessageExt& msg)
-{
- try
- {
- Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()),
- msg.getBody(), msg.getBodyLen());
-
- std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
- newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
- : originMsgId);
-
- newMsg.setFlag(msg.getFlag());
- newMsg.setProperties(msg.getProperties());
- newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
-
- int reTimes = msg.getReconsumeTimes() + 1;
- newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
- newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + reTimes);
-
- m_pDefaultMQPushConsumerImpl->getmQClientFactory()->getDefaultMQProducer()->send(newMsg);
-
- return true;
- }
- catch (...)
- {
- RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
- m_consumerGroup.c_str(), msg.toString().c_str());
- }
-
- return false;
-}
-
-
-MessageQueueLock& ConsumeMessageOrderlyService::getMessageQueueLock()
-{
- return m_messageQueueLock;
-}
-
-DefaultMQPushConsumer* ConsumeMessageOrderlyService::getDefaultMQPushConsumer()
-{
- return m_pDefaultMQPushConsumer;
-}
-
-ConsumeOrderlyRequest::ConsumeOrderlyRequest(ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- ConsumeMessageOrderlyService* pService)
-{
- m_pProcessQueue = pProcessQueue;
- m_messageQueue = messageQueue;
- m_pService = pService;
-}
-
-ConsumeOrderlyRequest::~ConsumeOrderlyRequest()
-{
-}
-
-void ConsumeOrderlyRequest::Do()
-{
- if (m_pProcessQueue->isDropped())
- {
- RMQ_WARN("run, the message queue not be able to consume, because it's dropped, MQ: %s",
- m_messageQueue.toString().c_str());
- return;
- }
-
- try
- {
- kpr::Mutex* objLock = m_pService->getMessageQueueLock().fetchLockObject(m_messageQueue);
- {
- kpr::ScopedLock<kpr::Mutex> lock(*objLock);
-
- MessageModel messageModel = m_pService->getDefaultMQPushConsumerImpl()->messageModel();
- if (BROADCASTING == messageModel
- || (m_pProcessQueue->isLocked() || !m_pProcessQueue->isLockExpired()))
- {
- long long beginTime = KPRUtil::GetCurrentTimeMillis();
- for (bool continueConsume = true; continueConsume;)
- {
- if (m_pProcessQueue->isDropped())
- {
- RMQ_INFO("the message queue not be able to consume, because it's droped, MQ: %s",
- m_messageQueue.toString().c_str());
- break;
- }
-
- if (CLUSTERING == messageModel
- && !m_pProcessQueue->isLocked())
- {
- RMQ_WARN("the message queue not locked, so consume later, MQ: %s", m_messageQueue.toString().c_str());
- m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
- break;
- }
-
- if (CLUSTERING == messageModel
- && m_pProcessQueue->isLockExpired())
- {
- RMQ_WARN("the message queue lock expired, so consume later, MQ: %s", m_messageQueue.toString().c_str());
- m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
- break;
- }
-
- long interval = long(KPRUtil::GetCurrentTimeMillis() - beginTime);
- if (interval > ConsumeMessageOrderlyService::s_MaxTimeConsumeContinuously)
- {
- m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
- break;
- }
-
- int consumeBatchSize =
- m_pService->getDefaultMQPushConsumer()->getConsumeMessageBatchMaxSize();
-
- std::list<MessageExt*> msgs = m_pProcessQueue->takeMessages(consumeBatchSize);
- if (!msgs.empty())
- {
- ConsumeOrderlyContext context(m_messageQueue);
-
- ConsumeOrderlyStatus status = SUSPEND_CURRENT_QUEUE_A_MOMENT;
-
- ConsumeMessageContext consumeMessageContext;
- if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
- {
- consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
- consumeMessageContext.mq = m_messageQueue;
- consumeMessageContext.msgList = msgs;
- consumeMessageContext.success = false;
- m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
- }
-
- long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
- try
- {
- kpr::ScopedLock<kpr::Mutex> lock(m_pProcessQueue->getLockConsume());
- if (m_pProcessQueue->isDropped())
- {
- RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
- m_messageQueue.toString().c_str());
- break;
- }
-
- status = m_pService->getMessageListener()->consumeMessage(msgs, context);
- }
- catch (...)
- {
- RMQ_WARN("consumeMessage exception, Group: {%s}, Msgs: {%u}, MQ: %s",//
- m_pService->getConsumerGroup().c_str(),
- (unsigned)msgs.size(),
- m_messageQueue.toString().c_str());
- }
-
- long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
-
- if (SUSPEND_CURRENT_QUEUE_A_MOMENT == status
- || ROLLBACK == status)
- {
- RMQ_WARN("consumeMessage Orderly return not OK, Group: {%s} Msgs: {%u} MQ: %s",//
- m_pService->getConsumerGroup().c_str(),
- (unsigned)msgs.size(),
- m_messageQueue.toString().c_str());
- //status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
-
- if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
- {
- consumeMessageContext.success = (SUCCESS == status
- || COMMIT == status);
- m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
- }
-
- m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
- MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat()
- .consumeMsgRTMax, consumeRT);
-
- continueConsume = m_pService->processConsumeResult(msgs, status, context, *this);
- }
- else
- {
- continueConsume = false;
- }
- }
- }
- else
- {
- if (m_pProcessQueue->isDropped())
- {
- RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
- m_messageQueue.toString().c_str());
- return;
- }
-
- m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 100);
- }
- }
- }
- catch(...)
- {
- RMQ_WARN("ConsumeOrderlyRequest exception");
- }
-
- return;
-}
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
deleted file mode 100755
index 0f8628b..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMEMESSAGEORDERLYSERVICE_H__
-#define __CONSUMEMESSAGEORDERLYSERVICE_H__
-
-#include "ConsumeMessageService.h"
-
-#include <list>
-#include <string>
-#include "RocketMQClient.h"
-#include "ConsumerStatManage.h"
-#include "MessageQueueLock.h"
-#include "MessageListener.h"
-#include "ThreadPool.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
-class DefaultMQPushConsumerImpl;
-class MessageListenerOrderly;
-class DefaultMQPushConsumer;
-class ConsumeMessageOrderlyService;
-
-class ConsumeOrderlyRequest: public kpr::ThreadPoolWork
-{
-public:
- ConsumeOrderlyRequest(ProcessQueue *pProcessQueue,
- const MessageQueue &messageQueue,
- ConsumeMessageOrderlyService *pService);
- ~ConsumeOrderlyRequest();
-
- virtual void Do();
-
- ProcessQueue *getProcessQueue()
- {
- return m_pProcessQueue;
- }
-
- MessageQueue &getMessageQueue()
- {
- return m_messageQueue;
- }
-
-private:
- ProcessQueue *m_pProcessQueue;
- MessageQueue m_messageQueue;
- ConsumeMessageOrderlyService *m_pService;
-};
-
-
-class ConsumeMessageOrderlyService : public ConsumeMessageService
-{
-public:
- static const long s_MaxTimeConsumeContinuously = 60000;
-
-public:
- ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl
- *pDefaultMQPushConsumerImpl,
- MessageListenerOrderly *pMessageListener);
- ~ConsumeMessageOrderlyService();
-
- void start();
- void shutdown();
-
- void unlockAllMQ();
- void lockMQPeriodically();
- bool lockOneMQ(MessageQueue &mq);
- void tryLockLaterAndReconsume(MessageQueue &messageQueue,
- ProcessQueue *pProcessQueue,
- long long delayMills);
- bool processConsumeResult(std::list<MessageExt *> &msgs,
- ConsumeOrderlyStatus status,
- ConsumeOrderlyContext &context,
- ConsumeOrderlyRequest &consumeRequest);
- bool checkReconsumeTimes(std::list<MessageExt *> &msgs);
- bool sendMessageBack(MessageExt &msg);
- ConsumerStat& getConsumerStat();
-
- void submitConsumeRequestLater(ProcessQueue *pProcessQueue,
- const MessageQueue &messageQueue,
- long long suspendTimeMillis);
-
- void submitConsumeRequest(std::list<MessageExt *> &msgs,
- ProcessQueue *pProcessQueue,
- const MessageQueue &messageQueue,
- bool dispathToConsume);
-
- void updateCorePoolSize(int corePoolSize);
- MessageQueueLock &getMessageQueueLock();
- std::string &getConsumerGroup();
- MessageListenerOrderly *getMessageListener();
- DefaultMQPushConsumerImpl *getDefaultMQPushConsumerImpl();
- DefaultMQPushConsumer *getDefaultMQPushConsumer();
-
-private:
- volatile bool m_stoped;
- DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
- DefaultMQPushConsumer *m_pDefaultMQPushConsumer;
- MessageListenerOrderly *m_pMessageListener;
- std::string m_consumerGroup;
- MessageQueueLock m_messageQueueLock;
-
- kpr::ThreadPoolPtr m_pConsumeExecutor;
- kpr::TimerThreadPtr m_scheduledExecutorService;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
deleted file mode 100755
index 57a9bee..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __CONSUMEMESSAGESERVICE_H__
-#define __CONSUMEMESSAGESERVICE_H__
-
-#include <list>
-
-namespace rmq
-{
- class MessageExt;
- class ProcessQueue;
- class MessageQueue;
-
- class ConsumeMessageService
- {
- public:
- virtual ~ConsumeMessageService() {}
- virtual void start() = 0;
- virtual void shutdown() = 0;
- virtual void updateCorePoolSize(int corePoolSize) = 0;
- virtual void submitConsumeRequest(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- bool dispathToConsume) = 0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
deleted file mode 100755
index 6ef5837..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ConsumeType.h"
-
-namespace rmq
-{
-
-const char* getConsumeTypeString(ConsumeType type)
-{
- switch (type)
- {
- case CONSUME_ACTIVELY:
- return "CONSUME_ACTIVELY";
- case CONSUME_PASSIVELY:
- return "CONSUME_PASSIVELY";
- }
-
- return "UnknowConsumeType";
-}
-
-const char* getConsumeFromWhereString(ConsumeFromWhere type)
-{
- switch (type)
- {
- case CONSUME_FROM_LAST_OFFSET:
- return "CONSUME_FROM_LAST_OFFSET";
- case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
- return "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST";
- case CONSUME_FROM_MAX_OFFSET:
- return "CONSUME_FROM_MAX_OFFSET";
- case CONSUME_FROM_MIN_OFFSET:
- return "CONSUME_FROM_MIN_OFFSET";
- case CONSUME_FROM_FIRST_OFFSET:
- return "CONSUME_FROM_FIRST_OFFSET";
- case CONSUME_FROM_TIMESTAMP:
- return "CONSUME_FROM_TIMESTAMP";
- }
-
- return "UnknowConsumeFromWhere";
-}
-
-const char* getMessageModelString(MessageModel type)
-{
- switch (type)
- {
- case CLUSTERING:
- return "CLUSTERING";
- case BROADCASTING:
- return "BROADCASTING";
- }
-
- return "UnknowMessageModel";
-}
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
deleted file mode 100755
index c9dc304..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ConsumerInvokeCallback.h"
-#include "ResponseFuture.h"
-#include "PullResult.h"
-#include "MQClientAPIImpl.h"
-#include "PullCallback.h"
-#include "MQClientException.h"
-#include "RemotingCommand.h"
-
-namespace rmq
-{
-
-ConsumerInvokeCallback::ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl)
- : m_pPullCallback(pPullCallback),
- m_pMQClientAPIImpl(pMQClientAPIImpl)
-{
-}
-
-ConsumerInvokeCallback::~ConsumerInvokeCallback()
-{
- if (m_pPullCallback != NULL)
- {
- delete m_pPullCallback;
- m_pPullCallback = NULL;
- }
-}
-
-void ConsumerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
-{
- if (m_pPullCallback == NULL)
- {
- delete this;
- return;
- }
-
- RemotingCommand* response = pResponseFuture->getResponseCommand();
- if (response != NULL)
- {
- try
- {
- PullResult* pullResult = m_pMQClientAPIImpl->processPullResponse(response);
- response->setBody(NULL, 0, false);
-
- m_pPullCallback->onSuccess(*pullResult);
-
- pullResult->msgFoundList.clear();
- delete pullResult;
- }
- catch (MQException& e)
- {
- m_pPullCallback->onException(e);
- }
-
- delete response;
- }
- else
- {
- if (!pResponseFuture->isSendRequestOK())
- {
- std::string msg = "send request failed";
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pPullCallback->onException(e);
- }
- else if (pResponseFuture->isTimeout())
- {
- std::string msg = "wait response timeout";
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pPullCallback->onException(e);
- }
- else
- {
- std::string msg = "unknow reseaon";
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pPullCallback->onException(e);
- }
- }
-
- delete this;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
deleted file mode 100755
index 675f2fd..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMER_INVOKECALLBACK_H__
-#define __CONSUMER_INVOKECALLBACK_H__
-
-#include "InvokeCallback.h"
-
-namespace rmq
-{
- class PullCallback;
- class MQClientAPIImpl;
-
- class ConsumerInvokeCallback : public InvokeCallback
- {
- public:
- ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl);
- virtual ~ConsumerInvokeCallback();
- virtual void operationComplete(ResponseFuturePtr pResponseFuture);
-
- private:
- PullCallback* m_pPullCallback;
- MQClientAPIImpl* m_pMQClientAPIImpl;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
deleted file mode 100755
index 92cf74c..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __CONSUMERSTAT_H__
-#define __CONSUMERSTAT_H__
-
-#include <list>
-#include <string>
-
-#include "AtomicValue.h"
-#include "KPRUtil.h"
-#include "Mutex.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
- struct ConsumerStat
- {
- long long createTimestamp;
- kpr::AtomicLong consumeMsgRTMax;
- kpr::AtomicLong consumeMsgRTTotal;
- kpr::AtomicLong consumeMsgOKTotal;
- kpr::AtomicLong consumeMsgFailedTotal;
- kpr::AtomicLong pullRTTotal;
- kpr::AtomicLong pullTimesTotal;
-
- ConsumerStat()
- {
- createTimestamp = KPRUtil::GetCurrentTimeMillis();
- consumeMsgRTMax = 0;
- consumeMsgRTTotal = 0;
- consumeMsgOKTotal = 0;
- consumeMsgFailedTotal = 0;
- pullRTTotal = 0;
- pullTimesTotal = 0;
- }
- };
-
-
- class ConsumerStatManager
- {
- public:
- ConsumerStat& getConsumertat()
- {
- return m_consumertat;
- }
-
- std::list<ConsumerStat>& getSnapshotList()
- {
- return m_snapshotList;
- }
-
- /**
- * every 1s
- */
- void recordSnapshotPeriodically()
- {
- kpr::ScopedWLock<kpr::RWMutex> lock(m_snapshotListLock);
- m_snapshotList.push_back(m_consumertat);
- if (m_snapshotList.size() > 60)
- {
- m_snapshotList.pop_front();
- }
- }
-
- /**
- * every 1m
- */
- void logStatsPeriodically(std::string& group, std::string& clientId)
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_snapshotListLock);
- if (m_snapshotList.size() >= 60)
- {
- ConsumerStat& first = m_snapshotList.front();
- ConsumerStat& last = m_snapshotList.back();
-
- {
- double avgRT = (last.consumeMsgRTTotal.get() - first.consumeMsgRTTotal.get())
- /
- (double)((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
- - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()));
-
- double tps = ((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
- - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()))
- / (double)(last.createTimestamp - first.createTimestamp);
-
- tps *= 1000;
-
- RMQ_INFO(
- "Consumer, {%s} {%s}, ConsumeAvgRT: {%f} ConsumeMaxRT: {%lld} TotalOKMsg: {%lld} TotalFailedMsg: {%lld} consumeTPS: {%f}",
- group.c_str(),
- clientId.c_str(),
- avgRT,
- last.consumeMsgRTMax.get(),
- last.consumeMsgOKTotal.get(),
- last.consumeMsgFailedTotal.get(),
- tps);
- }
-
- {
- double avgRT = (last.pullRTTotal.get() - first.pullRTTotal.get())
- / (double)(last.pullTimesTotal.get() - first.pullTimesTotal.get());
-
- RMQ_INFO("Consumer, {%s} {%s}, PullAvgRT: {%f} PullTimesTotal: {%lld}",
- group.c_str(),
- clientId.c_str(),
- avgRT,
- last.pullTimesTotal.get());
- }
- }
- }
-
- private:
- ConsumerStat m_consumertat;
- std::list<ConsumerStat> m_snapshotList;
- kpr::RWMutex m_snapshotListLock;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
deleted file mode 100755
index 67a8c8c..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPullConsumer.h"
-
-#include <list>
-#include <string>
-
-#include "MessageQueue.h"
-#include "MessageExt.h"
-#include "ClientConfig.h"
-#include "DefaultMQPullConsumerImpl.h"
-#include "MixAll.h"
-#include "AllocateMessageQueueStrategyInner.h"
-
-namespace rmq
-{
-
-DefaultMQPullConsumer::DefaultMQPullConsumer()
- : m_consumerGroup(MixAll::DEFAULT_CONSUMER_GROUP),
- m_brokerSuspendMaxTimeMillis(1000 * 20),
- m_consumerTimeoutMillisWhenSuspend(1000 * 30),
- m_consumerPullTimeoutMillis(1000 * 10),
- m_messageModel(CLUSTERING),
- m_pMessageQueueListener(NULL),
- m_pOffsetStore(NULL),
- m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
- m_unitMode(false),
- m_maxReconsumeTimes(16)
-{
- m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
-}
-
-DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& consumerGroup)
- : m_consumerGroup(consumerGroup),
- m_brokerSuspendMaxTimeMillis(1000 * 20),
- m_consumerTimeoutMillisWhenSuspend(1000 * 30),
- m_consumerPullTimeoutMillis(1000 * 10),
- m_messageModel(CLUSTERING),
- m_pMessageQueueListener(NULL),
- m_pOffsetStore(NULL),
- m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
- m_unitMode(false),
- m_maxReconsumeTimes(16)
-{
- m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
-}
-
-DefaultMQPullConsumer::~DefaultMQPullConsumer()
-{
- //memleak or coredump
- if (m_pAllocateMessageQueueStrategy)
- delete m_pAllocateMessageQueueStrategy;
- if (m_pDefaultMQPullConsumerImpl)
- delete m_pDefaultMQPullConsumerImpl;
-}
-
-//MQAdmin
-void DefaultMQPullConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
- m_pDefaultMQPullConsumerImpl->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPullConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
-{
- return m_pDefaultMQPullConsumerImpl->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQPullConsumer::maxOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQPullConsumerImpl->maxOffset(mq);
-}
-
-long long DefaultMQPullConsumer::minOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQPullConsumerImpl->minOffset(mq);
-}
-
-long long DefaultMQPullConsumer::earliestMsgStoreTime(const MessageQueue& mq)
-{
- return m_pDefaultMQPullConsumerImpl->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQPullConsumer::viewMessage(const std::string& msgId)
-{
- return m_pDefaultMQPullConsumerImpl->viewMessage(msgId);
-}
-
-QueryResult DefaultMQPullConsumer::queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end)
-{
- return m_pDefaultMQPullConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
-}
-// MQadmin end
-
-AllocateMessageQueueStrategy* DefaultMQPullConsumer::getAllocateMessageQueueStrategy()
-{
- return m_pAllocateMessageQueueStrategy;
-}
-
-void DefaultMQPullConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
- m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-int DefaultMQPullConsumer::getBrokerSuspendMaxTimeMillis()
-{
- return m_brokerSuspendMaxTimeMillis;
-}
-
-void DefaultMQPullConsumer::setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis)
-{
- m_brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
-}
-
-std::string DefaultMQPullConsumer::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-void DefaultMQPullConsumer::setConsumerGroup(const std::string& consumerGroup)
-{
- m_consumerGroup = consumerGroup;
-}
-
-int DefaultMQPullConsumer::getConsumerPullTimeoutMillis()
-{
- return m_consumerPullTimeoutMillis;
-}
-
-void DefaultMQPullConsumer::setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis)
-{
- m_consumerPullTimeoutMillis = consumerPullTimeoutMillis;
-}
-
-int DefaultMQPullConsumer::getConsumerTimeoutMillisWhenSuspend()
-{
- return m_consumerTimeoutMillisWhenSuspend;
-}
-
-void DefaultMQPullConsumer::setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend)
-{
- m_consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
-}
-
-MessageModel DefaultMQPullConsumer::getMessageModel()
-{
- return m_messageModel;
-}
-
-void DefaultMQPullConsumer::setMessageModel(MessageModel messageModel)
-{
- m_messageModel = messageModel;
-}
-
-MessageQueueListener* DefaultMQPullConsumer::getMessageQueueListener()
-{
- return m_pMessageQueueListener;
-}
-
-void DefaultMQPullConsumer::setMessageQueueListener(MessageQueueListener* pMessageQueueListener)
-{
- m_pMessageQueueListener = pMessageQueueListener;
-}
-
-std::set<std::string> DefaultMQPullConsumer::getRegisterTopics()
-{
- return m_registerTopics;
-}
-
-void DefaultMQPullConsumer::setRegisterTopics(std::set<std::string> registerTopics)
-{
- m_registerTopics = registerTopics;
-}
-
-//MQConsumer
-void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
-{
- m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, "");
-}
-
-void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
-{
- m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
-}
-
-
-
-std::set<MessageQueue>* DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic)
-{
- return m_pDefaultMQPullConsumerImpl->fetchSubscribeMessageQueues(topic);
-}
-
-void DefaultMQPullConsumer::start()
-{
- m_pDefaultMQPullConsumerImpl->start();
-}
-
-void DefaultMQPullConsumer::shutdown()
-{
- m_pDefaultMQPullConsumerImpl->shutdown();
-}
-//MQConsumer end
-
-//MQPullConsumer
-void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)
-{
- m_registerTopics.insert(topic);
-
- if (pListener)
- {
- m_pMessageQueueListener = pListener;
- }
-}
-
-PullResult* DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
-{
- return m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)
-{
- m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums, pPullCallback);
-}
-
-PullResult* DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
-{
- return m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums);
-}
-
-void DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- PullCallback* pPullCallback)
-{
- m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums, pPullCallback);
-}
-
-void DefaultMQPullConsumer::updateConsumeOffset(MessageQueue& mq, long long offset)
-{
- m_pDefaultMQPullConsumerImpl->updateConsumeOffset(mq, offset);
-}
-
-long long DefaultMQPullConsumer::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
-{
- return m_pDefaultMQPullConsumerImpl->fetchConsumeOffset(mq, fromStore);
-}
-
-std::set<MessageQueue>* DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic)
-{
- return m_pDefaultMQPullConsumerImpl->fetchMessageQueuesInBalance(topic);
-}
-//MQPullConsumer end
-
-OffsetStore* DefaultMQPullConsumer::getOffsetStore()
-{
- return m_pOffsetStore;
-}
-
-void DefaultMQPullConsumer::setOffsetStore(OffsetStore* offsetStore)
-{
- m_pOffsetStore = offsetStore;
-}
-
-DefaultMQPullConsumerImpl* DefaultMQPullConsumer::getDefaultMQPullConsumerImpl()
-{
- return m_pDefaultMQPullConsumerImpl;
-}
-
-bool DefaultMQPullConsumer::isUnitMode()
-{
- return m_unitMode;
-}
-
-void DefaultMQPullConsumer::setUnitMode(bool isUnitMode)
-{
- m_unitMode = isUnitMode;
-}
-
-int DefaultMQPullConsumer::getMaxReconsumeTimes()
-{
- return m_maxReconsumeTimes;
-}
-
-void DefaultMQPullConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
-{
- m_maxReconsumeTimes = maxReconsumeTimes;
-}
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
deleted file mode 100755
index d6465e9..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPullConsumerImpl.h"
-
-#include <iostream>
-#include <string>
-#include <set>
-#include "DefaultMQPullConsumer.h"
-#include "DefaultMQProducer.h"
-#include "MQClientFactory.h"
-#include "MQAdminImpl.h"
-#include "RebalancePullImpl.h"
-#include "MQClientAPIImpl.h"
-#include "OffsetStore.h"
-#include "MixAll.h"
-#include "MQClientManager.h"
-#include "LocalFileOffsetStore.h"
-#include "RemoteBrokerOffsetStore.h"
-#include "PullSysFlag.h"
-#include "FilterAPI.h"
-#include "PullAPIWrapper.h"
-#include "MQClientException.h"
-#include "Validators.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-DefaultMQPullConsumerImpl::DefaultMQPullConsumerImpl(DefaultMQPullConsumer* pDefaultMQPullConsumer)
- : m_pDefaultMQPullConsumer(pDefaultMQPullConsumer),
- m_serviceState(CREATE_JUST)
-{
- m_pMQClientFactory = NULL;
- m_pPullAPIWrapper = NULL;
- m_pOffsetStore = NULL;
- m_pRebalanceImpl = new RebalancePullImpl(this);
-}
-
-DefaultMQPullConsumerImpl::~DefaultMQPullConsumerImpl()
-{
- if (m_pRebalanceImpl)
- delete m_pRebalanceImpl;
- if (m_pPullAPIWrapper)
- delete m_pPullAPIWrapper;
- if (m_pOffsetStore)
- delete m_pOffsetStore;
- //delete m_pMQClientFactory;
-}
-
-void DefaultMQPullConsumerImpl::start()
-{
- RMQ_INFO("DefaultMQPullConsumerImpl::start()");
- switch (m_serviceState)
- {
- case CREATE_JUST:
- {
- RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}",
- m_pDefaultMQPullConsumer->getConsumerGroup().c_str(),
- getMessageModelString(m_pDefaultMQPullConsumer->getMessageModel()));
-
- m_serviceState = START_FAILED;
- checkConfig();
- copySubscription();
-
- if (m_pDefaultMQPullConsumer->getMessageModel() == CLUSTERING)
- {
- m_pDefaultMQPullConsumer->changeInstanceNameToPID();
- }
-
- m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPullConsumer);
-
- m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
- m_pRebalanceImpl->setMessageModel(m_pDefaultMQPullConsumer->getMessageModel());
- m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy());
- m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory);
-
- m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
-
- if (m_pDefaultMQPullConsumer->getOffsetStore() != NULL)
- {
- m_pOffsetStore = m_pDefaultMQPullConsumer->getOffsetStore();
- }
- else
- {
- switch (m_pDefaultMQPullConsumer->getMessageModel())
- {
- case BROADCASTING:
- m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
- break;
- case CLUSTERING:
- m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
- break;
- default:
- break;
- }
- }
-
- m_pOffsetStore->load();
-
- bool registerOK =
- m_pMQClientFactory->registerConsumer(m_pDefaultMQPullConsumer->getConsumerGroup(), this);
- if (!registerOK)
- {
- m_serviceState = CREATE_JUST;
- std::string str = "The consumer group[" + m_pDefaultMQPullConsumer->getConsumerGroup();
- str += "] has been created before, specify another name please.";
- THROW_MQEXCEPTION(MQClientException, str, -1);
- }
-
- m_pMQClientFactory->start();
-
- m_serviceState = RUNNING;
- }
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1);
- default:
- break;
- }
-}
-
-
-void DefaultMQPullConsumerImpl::shutdown()
-{
- RMQ_DEBUG("DefaultMQPullConsumerImpl::shutdown()");
- switch (m_serviceState)
- {
- case CREATE_JUST:
- break;
- case RUNNING:
- persistConsumerOffset();
- m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPullConsumer->getConsumerGroup());
- m_pMQClientFactory->shutdown();
-
- m_serviceState = SHUTDOWN_ALREADY;
- break;
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
-}
-
-
-void DefaultMQPullConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
- makeSureStateOK();
- m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPullConsumerImpl::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
-{
- makeSureStateOK();
- return m_pOffsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE);
-}
-
-std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchMessageQueuesInBalance(const std::string& topic)
-{
- makeSureStateOK();
- std::set<MessageQueue>* mqResult = new std::set<MessageQueue>;
-
- kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
- std::map<MessageQueue, ProcessQueue*>& mqTable = m_pRebalanceImpl->getProcessQueueTable();
- RMQ_FOR_EACH(mqTable, it)
- {
- if (it->first.getTopic() == topic)
- {
- mqResult->insert(it->first);
- }
- }
-
- return mqResult;
-}
-
-std::vector<MessageQueue>* DefaultMQPullConsumerImpl::fetchPublishMessageQueues(const std::string& topic)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
-}
-
-std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->fetchSubscribeMessageQueues(topic);
-}
-
-long long DefaultMQPullConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
-}
-
-std::string DefaultMQPullConsumerImpl::groupName()
-{
- return m_pDefaultMQPullConsumer->getConsumerGroup();
-}
-
-MessageModel DefaultMQPullConsumerImpl::messageModel()
-{
- return m_pDefaultMQPullConsumer->getMessageModel();
-}
-
-ConsumeType DefaultMQPullConsumerImpl::consumeType()
-{
- return CONSUME_ACTIVELY;
-}
-
-ConsumeFromWhere DefaultMQPullConsumerImpl::consumeFromWhere()
-{
- return CONSUME_FROM_LAST_OFFSET;
-}
-
-std::set<SubscriptionData> DefaultMQPullConsumerImpl::subscriptions()
-{
- //TODO
- std::set<SubscriptionData> result;
- return result;
-}
-
-void DefaultMQPullConsumerImpl::doRebalance()
-{
- if (m_pRebalanceImpl != NULL)
- {
- m_pRebalanceImpl->doRebalance();
- }
-}
-
-void DefaultMQPullConsumerImpl::persistConsumerOffset()
-{
- try
- {
- makeSureStateOK();
-
- std::set<MessageQueue> mqs;
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
- std::map<MessageQueue, ProcessQueue*> processQueueTable = m_pRebalanceImpl->getProcessQueueTable();
- RMQ_FOR_EACH(processQueueTable, it)
- {
- mqs.insert(it->first);
- }
- }
-
- m_pOffsetStore->persistAll(mqs);
- }
- catch (...)
- {
- RMQ_ERROR("group {%s} persistConsumerOffset exception",
- m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
- }
-}
-
-void DefaultMQPullConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info)
-{
- std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
-
- if (subTable.find(topic) != subTable.end())
- {
- m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info));
- }
-}
-
-bool DefaultMQPullConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic)
-{
- std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
- if (subTable.find(topic) != subTable.end())
- {
- std::map<std::string, std::set<MessageQueue> >& mqs =
- m_pRebalanceImpl->getTopicSubscribeInfoTable();
- return mqs.find(topic) == mqs.end();
- }
-
- return false;
-}
-
-long long DefaultMQPullConsumerImpl::maxOffset(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-}
-
-long long DefaultMQPullConsumerImpl::minOffset(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
-}
-
-PullResult* DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums)
-{
- return pullSyncImpl(mq, subExpression, offset, maxNums, false);
-}
-
-void DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- PullCallback* pPullCallback)
-{
- pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, false);
-}
-
-PullResult* DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums)
-{
- return pullSyncImpl(mq, subExpression, offset, maxNums, true);
-}
-
-void DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- PullCallback* pPullCallback)
-{
- pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, true);
-}
-
-QueryResult DefaultMQPullConsumerImpl::queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end)
-{
- makeSureStateOK();
-
- QueryResult result(0, std::list<MessageExt*>());
- return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end);
-}
-
-long long DefaultMQPullConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
-}
-
-void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
-{
- return sendMessageBack(msg, delayLevel, brokerName, m_pDefaultMQPullConsumer->getConsumerGroup());
-}
-
-
-void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName,
- const std::string& consumerGroup)
-{
- try
- {
- std::string brokerAddr = brokerName.empty() ?
- socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName);
-
- m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg,
- consumerGroup.empty() ? m_pDefaultMQPullConsumer->getConsumerGroup() : consumerGroup,
- delayLevel,
- 3000);
- }
- catch (...)
- {
- RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
- Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPullConsumer->getConsumerGroup()),
- msg.getBody(), msg.getBodyLen());
-
- std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
- newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
- : originMsgId);
-
- newMsg.setFlag(msg.getFlag());
- newMsg.setProperties(msg.getProperties());
- newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
-
- int reTimes = msg.getReconsumeTimes() + 1;
- newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
- newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPullConsumer->getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + reTimes);
-
- m_pMQClientFactory->getDefaultMQProducer()->send(newMsg);
- }
-}
-
-void DefaultMQPullConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset)
-{
- makeSureStateOK();
- m_pOffsetStore->updateOffset(mq, offset, false);
-}
-
-MessageExt* DefaultMQPullConsumerImpl::viewMessage(const std::string& msgId)
-{
- makeSureStateOK();
-
- return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
-}
-
-DefaultMQPullConsumer* DefaultMQPullConsumerImpl::getDefaultMQPullConsumer()
-{
- return m_pDefaultMQPullConsumer;
-}
-
-OffsetStore* DefaultMQPullConsumerImpl::getOffsetStore()
-{
- return m_pOffsetStore;
-}
-
-void DefaultMQPullConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore)
-{
- m_pOffsetStore = pOffsetStore;
-}
-
-void DefaultMQPullConsumerImpl::makeSureStateOK()
-{
- if (m_serviceState != RUNNING)
- {
- THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1);
- }
-}
-
-PullResult* DefaultMQPullConsumerImpl::pullSyncImpl(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- bool block)
-{
- makeSureStateOK();
-
- if (offset < 0)
- {
- THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
- }
-
- if (maxNums <= 0)
- {
- THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
- }
-
- subscriptionAutomatically(mq.getTopic());
-
- int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
-
- SubscriptionDataPtr subscriptionData = NULL;
- try
- {
- subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
- }
- catch (...)
- {
- THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
- }
-
- int timeoutMillis =
- block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
- : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
-
- PullResult* pullResult = m_pPullAPIWrapper->pullKernelImpl(//
- mq, // 1
- subscriptionData->getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- SYNC, // 10
- NULL// 11
- );
-
- return m_pPullAPIWrapper->processPullResult(mq, *pullResult, *subscriptionData);
-}
-
-void DefaultMQPullConsumerImpl::subscriptionAutomatically(const std::string& topic)
-{
- std::map<std::string, SubscriptionData>& sd = m_pRebalanceImpl->getSubscriptionInner();
- std::map<std::string, SubscriptionData>::iterator it = sd.find(topic);
-
- if (it == sd.end())
- {
- try
- {
- SubscriptionDataPtr subscriptionData =
- FilterAPI::buildSubscriptionData(topic, SubscriptionData::SUB_ALL);
- sd[topic] = *subscriptionData;
- }
- catch (...)
- {
- RMQ_WARN("FilterAPI::buildSubscriptionData exception");
- }
- }
-}
-
-void DefaultMQPullConsumerImpl::pullAsyncImpl(//
- MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums,
- PullCallback* pPullCallback,//
- bool block)
-{
- makeSureStateOK();
-
- if (offset < 0)
- {
- THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
- }
-
- if (maxNums <= 0)
- {
- THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
- }
-
- if (pPullCallback == NULL)
- {
- THROW_MQEXCEPTION(MQClientException, "pullCallback is null", -1);
- }
-
- subscriptionAutomatically(mq.getTopic());
- try
- {
- int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
-
- SubscriptionDataPtr subscriptionData = NULL;
- try
- {
- subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
- }
- catch (...)
- {
- THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
- }
-
- int timeoutMillis =
- block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
- : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
- DefaultMQPullConsumerImplCallback* callback =
- new DefaultMQPullConsumerImplCallback(*subscriptionData,
- mq, this, pPullCallback);
-
- m_pPullAPIWrapper->pullKernelImpl(
- mq, // 1
- subscriptionData->getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- ASYNC, // 10
- callback// 11
- );
- }
- catch (const MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, "pullAsync unknow exception", -1);
- }
-}
-
-
-void DefaultMQPullConsumerImpl::copySubscription()
-{
- try
- {
- std::set<std::string> registerTopics = m_pDefaultMQPullConsumer->getRegisterTopics();
- std::set<std::string>::iterator it = registerTopics.begin();
-
- for (; it != registerTopics.end(); it++)
- {
- SubscriptionDataPtr subscriptionData =
- FilterAPI::buildSubscriptionData(*it, SubscriptionData::SUB_ALL);
- m_pRebalanceImpl->getSubscriptionInner()[*it] = *subscriptionData;
- }
- }
- catch (...)
- {
- THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
- }
-}
-
-
-void DefaultMQPullConsumerImpl::checkConfig()
-{
- // check consumerGroup
- Validators::checkGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
-
- // consumerGroup
- if (m_pDefaultMQPullConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP)
- {
- THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal "
- + MixAll::DEFAULT_CONSUMER_GROUP //
- + ", please specify another one.", -1);
- }
-
- if (m_pDefaultMQPullConsumer->getMessageModel() != BROADCASTING
- && m_pDefaultMQPullConsumer->getMessageModel() != CLUSTERING)
- {
- THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
- }
-
- // allocateMessageQueueStrategy
- if (m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy() == NULL)
- {
- THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
- }
-}
-
-ServiceState DefaultMQPullConsumerImpl::getServiceState()
-{
- return m_serviceState;
-}
-
-void DefaultMQPullConsumerImpl::setServiceState(ServiceState serviceState)
-{
- m_serviceState = serviceState;
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
deleted file mode 100755
index 171565c..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __DEFAULTMQPULLCONSUMERIMPL_H__
-#define __DEFAULTMQPULLCONSUMERIMPL_H__
-
-#include <string>
-#include <set>
-#include <map>
-#include <vector>
-#include "MQConsumerInner.h"
-#include "MessageExt.h"
-#include "QueryResult.h"
-#include "ServiceState.h"
-#include "PullRequest.h"
-#include "MessageQueue.h"
-#include "PullResult.h"
-#include "PullCallback.h"
-#include "PullAPIWrapper.h"
-
-namespace rmq
-{
-class DefaultMQPullConsumer;
-class PullCallback;
-class OffsetStore;
-class RebalanceImpl;
-class MQClientFactory;
-class PullAPIWrapper;
-
-/**
-* PullConsumer imp
-*/
-class DefaultMQPullConsumerImpl : public MQConsumerInner
-{
-public:
- DefaultMQPullConsumerImpl(DefaultMQPullConsumer *pDefaultMQPullConsumer);
- ~DefaultMQPullConsumerImpl();
- void createTopic(const std::string &key, const std::string &newTopic,
- int queueNum);
- long long fetchConsumeOffset(MessageQueue &mq, bool fromStore);
- std::set<MessageQueue> *fetchMessageQueuesInBalance(const std::string &topic);
- std::vector<MessageQueue> *fetchPublishMessageQueues(const std::string &topic);
- std::set<MessageQueue> *fetchSubscribeMessageQueues(const std::string &topic);
- long long earliestMsgStoreTime(const MessageQueue &mq);
- std::string groupName();
- MessageModel messageModel();
- ConsumeType consumeType();
- ConsumeFromWhere consumeFromWhere();
- std::set<SubscriptionData> subscriptions();
- void doRebalance();
- void persistConsumerOffset();
- void updateTopicSubscribeInfo(const std::string &topic,
- const std::set<MessageQueue> &info);
- bool isSubscribeTopicNeedUpdate(const std::string &topic);
- long long maxOffset(const MessageQueue &mq);
- long long minOffset(const MessageQueue &mq);
-
- PullResult *pull(MessageQueue &mq,
- const std::string &subExpression,
- long long offset,
- int maxNums);
-
- void pull(MessageQueue &mq,
- const std::string &subExpression,
- long long offset,
- int maxNums,
- PullCallback *pPullCallback);
-
- PullResult *pullBlockIfNotFound(MessageQueue &mq,
- const std::string &subExpression,
- long long offset, int maxNums);
-
- void pullBlockIfNotFound(MessageQueue &mq,
- const std::string &subExpression,
- long long offset, int maxNums,
- PullCallback *pPullCallback);
-
- QueryResult queryMessage(const std::string &topic,
- const std::string &key,
- int maxNum,
- long long begin,
- long long end);
-
- long long searchOffset(const MessageQueue &mq, long long timestamp);
- void sendMessageBack(MessageExt &msg, int delayLevel,
- const std::string &brokerName);
- void sendMessageBack(MessageExt &msg, int delayLevel,
- const std::string &brokerName, const std::string &consumerGroup);
- void shutdown();
- void updateConsumeOffset(MessageQueue &mq, long long offset);
- MessageExt *viewMessage(const std::string &msgId);
- DefaultMQPullConsumer *getDefaultMQPullConsumer();
- OffsetStore *getOffsetStore();
- void setOffsetStore(OffsetStore *pOffsetStore);
- void start();
-
- ServiceState getServiceState();
- void setServiceState(ServiceState serviceState);
-
-private:
- void makeSureStateOK();
- void subscriptionAutomatically(const std::string &topic);
- void copySubscription();
- void checkConfig();
-
- PullResult *pullSyncImpl(MessageQueue &mq,
- const std::string &subExpression,
- long long offset,
- int maxNums,
- bool block) ;
- void pullAsyncImpl(MessageQueue &mq,
- const std::string &subExpression,
- long long offset,
- int maxNums,
- PullCallback *pPullCallback,
- bool block);
-
-private:
- DefaultMQPullConsumer *m_pDefaultMQPullConsumer;
- ServiceState m_serviceState;
- MQClientFactory *m_pMQClientFactory;
- PullAPIWrapper *m_pPullAPIWrapper;
- OffsetStore *m_pOffsetStore;
- RebalanceImpl *m_pRebalanceImpl;
- friend class DefaultMQPullConsumerImplCallback;
-};
-
-class DefaultMQPullConsumerImplCallback : public PullCallback
-{
-public:
- DefaultMQPullConsumerImplCallback(SubscriptionData &subscriptionData,
- MessageQueue &mq,
- DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl,
- PullCallback *pCallback)
- : m_subscriptionData(subscriptionData),
- m_mq(mq),
- m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl),
- m_pCallback(pCallback)
- {
- }
-
- void onSuccess(PullResult &pullResult)
- {
- m_pCallback->onSuccess(
- *m_pDefaultMQPullConsumerImpl->m_pPullAPIWrapper->
- processPullResult(m_mq, pullResult, m_subscriptionData));
- }
-
- void onException(MQException &e)
- {
- m_pCallback->onException(e);
- }
-
-private:
- SubscriptionData m_subscriptionData;
- MessageQueue m_mq;
- DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
- PullCallback *m_pCallback;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
deleted file mode 100755
index 45ee907..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPushConsumer.h"
-#include <list>
-#include <string>
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "MessageQueue.h"
-#include "MessageExt.h"
-#include "ClientConfig.h"
-#include "ConsumerStatManage.h"
-#include "MixAll.h"
-#include "AllocateMessageQueueStrategyInner.h"
-
-namespace rmq
-{
-
-class AllocateMessageQueueStrategy;
-
-DefaultMQPushConsumer::DefaultMQPushConsumer()
-{
- m_consumerGroup = MixAll::DEFAULT_CONSUMER_GROUP;
- m_messageModel = CLUSTERING;
- m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
- m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
- m_pMessageListener = NULL;
- m_consumeThreadMin = 5;
- m_consumeThreadMax = 25;
- m_consumeConcurrentlyMaxSpan = 2000;
- m_pullThresholdForQueue = 1000;
- m_pullInterval = 0;
- m_consumeMessageBatchMaxSize = 1;
- m_pullBatchSize = 32;
- m_postSubscriptionWhenPull = false;
- m_unitMode = false;
- m_maxReconsumeTimes = 16;
- m_suspendCurrentQueueTimeMillis = 1000;
- m_consumeTimeout = 15;
- m_pOffsetStore = NULL;
- m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
-}
-
-DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& consumerGroup)
-{
- m_consumerGroup = consumerGroup;
- m_messageModel = CLUSTERING;
- m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
- m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
- m_pMessageListener = NULL;
- m_consumeThreadMin = 5;
- m_consumeThreadMax = 25;
- m_consumeConcurrentlyMaxSpan = 2000;
- m_pullThresholdForQueue = 1000;
- m_pullInterval = 0;
- m_consumeMessageBatchMaxSize = 1;
- m_pullBatchSize = 32;
- m_postSubscriptionWhenPull = false;
- m_unitMode = false;
- m_maxReconsumeTimes = 16;
- m_suspendCurrentQueueTimeMillis = 1000;
- m_consumeTimeout = 15;
- m_pOffsetStore = NULL;
- m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
-}
-
-DefaultMQPushConsumer::~DefaultMQPushConsumer()
-{
- delete m_pAllocateMessageQueueStrategy;
- delete m_pDefaultMQPushConsumerImpl;
-}
-
-//MQAdmin
-void DefaultMQPushConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
- m_pDefaultMQPushConsumerImpl->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQPushConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
-{
- return m_pDefaultMQPushConsumerImpl->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQPushConsumer::maxOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQPushConsumerImpl->maxOffset(mq);
-}
-
-long long DefaultMQPushConsumer::minOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQPushConsumerImpl->minOffset(mq);
-}
-
-long long DefaultMQPushConsumer::earliestMsgStoreTime(const MessageQueue& mq)
-{
- return m_pDefaultMQPushConsumerImpl->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQPushConsumer::viewMessage(const std::string& msgId)
-{
- return m_pDefaultMQPushConsumerImpl->viewMessage(msgId);
-}
-
-QueryResult DefaultMQPushConsumer::queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end)
-{
- return m_pDefaultMQPushConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
-}
-// MQadmin end
-
-AllocateMessageQueueStrategy* DefaultMQPushConsumer::getAllocateMessageQueueStrategy()
-{
- return m_pAllocateMessageQueueStrategy;
-}
-
-void DefaultMQPushConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
- m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-int DefaultMQPushConsumer::getConsumeConcurrentlyMaxSpan()
-{
- return m_consumeConcurrentlyMaxSpan;
-}
-
-void DefaultMQPushConsumer::setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan)
-{
- m_consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
-}
-
-ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere()
-{
- return m_consumeFromWhere;
-}
-
-void DefaultMQPushConsumer::setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)
-{
- m_consumeFromWhere = consumeFromWhere;
-}
-
-int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize()
-{
- return m_consumeMessageBatchMaxSize;
-}
-
-void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize)
-{
- m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
-}
-
-std::string DefaultMQPushConsumer::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-void DefaultMQPushConsumer::setConsumerGroup(const std::string& consumerGroup)
-{
- m_consumerGroup = consumerGroup;
-}
-
-int DefaultMQPushConsumer::getConsumeThreadMax()
-{
- return m_consumeThreadMax;
-}
-
-void DefaultMQPushConsumer::setConsumeThreadMax(int consumeThreadMax)
-{
- m_consumeThreadMax = consumeThreadMax;
-}
-
-int DefaultMQPushConsumer::getConsumeThreadMin()
-{
- return m_consumeThreadMin;
-}
-
-void DefaultMQPushConsumer::setConsumeThreadMin(int consumeThreadMin)
-{
- m_consumeThreadMin = consumeThreadMin;
-}
-
-DefaultMQPushConsumerImpl* DefaultMQPushConsumer::getDefaultMQPushConsumerImpl()
-{
- return m_pDefaultMQPushConsumerImpl;
-}
-
-MessageListener* DefaultMQPushConsumer::getMessageListener()
-{
- return m_pMessageListener;
-}
-
-void DefaultMQPushConsumer::setMessageListener(MessageListener* pMessageListener)
-{
- m_pMessageListener = pMessageListener;
-}
-
-MessageModel DefaultMQPushConsumer::getMessageModel()
-{
- return m_messageModel;
-}
-
-void DefaultMQPushConsumer::setMessageModel(MessageModel messageModel)
-{
- m_messageModel = messageModel;
-}
-
-int DefaultMQPushConsumer::getPullBatchSize()
-{
- return m_pullBatchSize;
-}
-
-void DefaultMQPushConsumer::setPullBatchSize(int pullBatchSize)
-{
- m_pullBatchSize = pullBatchSize;
-}
-
-long DefaultMQPushConsumer::getPullInterval()
-{
- return m_pullInterval;
-}
-
-void DefaultMQPushConsumer::setPullInterval(long pullInterval)
-{
- m_pullInterval = pullInterval;
-}
-
-int DefaultMQPushConsumer::getPullThresholdForQueue()
-{
- return m_pullThresholdForQueue;
-}
-
-void DefaultMQPushConsumer::setPullThresholdForQueue(int pullThresholdForQueue)
-{
- m_pullThresholdForQueue = pullThresholdForQueue;
-}
-
-std::map<std::string, std::string>& DefaultMQPushConsumer::getSubscription()
-{
- return m_subscription;
-}
-
-void DefaultMQPushConsumer::setSubscription(const std::map<std::string, std::string>& subscription)
-{
- m_subscription = subscription;
-}
-
-//MQConsumer
-void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
-{
- m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, "");
-}
-
-void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName)
-{
- m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
-}
-
-
-std::set<MessageQueue>* DefaultMQPushConsumer::fetchSubscribeMessageQueues(const std::string& topic)
-{
- return m_pDefaultMQPushConsumerImpl->fetchSubscribeMessageQueues(topic);
-}
-
-void DefaultMQPushConsumer::start()
-{
- m_pDefaultMQPushConsumerImpl->start();
-}
-
-void DefaultMQPushConsumer::shutdown()
-{
- m_pDefaultMQPushConsumerImpl->shutdown();
-}
-//MQConsumer end
-
-//MQPushConsumer
-void DefaultMQPushConsumer::registerMessageListener(MessageListener* pMessageListener)
-{
- m_pMessageListener = pMessageListener;
- m_pDefaultMQPushConsumerImpl->registerMessageListener(pMessageListener);
-}
-
-void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& subExpression)
-{
- m_pDefaultMQPushConsumerImpl->subscribe(topic, subExpression);
-}
-
-void DefaultMQPushConsumer::unsubscribe(const std::string& topic)
-{
- m_pDefaultMQPushConsumerImpl->unsubscribe(topic);
-}
-
-void DefaultMQPushConsumer::updateCorePoolSize(int corePoolSize)
-{
- m_pDefaultMQPushConsumerImpl->updateCorePoolSize(corePoolSize);
-}
-
-void DefaultMQPushConsumer::suspend()
-{
- m_pDefaultMQPushConsumerImpl->suspend();
-}
-
-void DefaultMQPushConsumer::resume()
-{
- m_pDefaultMQPushConsumerImpl->resume();
-}
-//MQPushConsumer end
-
-OffsetStore* DefaultMQPushConsumer::getOffsetStore()
-{
- return m_pOffsetStore;
-}
-
-void DefaultMQPushConsumer::setOffsetStore(OffsetStore* pOffsetStore)
-{
- m_pOffsetStore = pOffsetStore;
-}
-
-std::string DefaultMQPushConsumer::getConsumeTimestamp() {
- return m_consumeTimestamp;
-}
-
-void DefaultMQPushConsumer::setConsumeTimestamp(std::string consumeTimestamp) {
- m_consumeTimestamp = consumeTimestamp;
-}
-
-bool DefaultMQPushConsumer::isPostSubscriptionWhenPull()
-{
- return m_postSubscriptionWhenPull;
-}
-
-
-void DefaultMQPushConsumer::setPostSubscriptionWhenPull(bool postSubscriptionWhenPull)
-{
- m_postSubscriptionWhenPull = postSubscriptionWhenPull;
-}
-
-
-bool DefaultMQPushConsumer::isUnitMode()
-{
- return m_unitMode;
-}
-
-
-void DefaultMQPushConsumer::setUnitMode(bool isUnitMode)
-{
- m_unitMode = isUnitMode;
-}
-
-int DefaultMQPushConsumer::getMaxReconsumeTimes()
-{
- return m_maxReconsumeTimes;
-}
-
-
-void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
-{
- m_maxReconsumeTimes = maxReconsumeTimes;
-}
-
-
-int DefaultMQPushConsumer::getSuspendCurrentQueueTimeMillis()
-{
- return m_suspendCurrentQueueTimeMillis;
-}
-
-
-void DefaultMQPushConsumer::setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis)
-{
- m_suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
-}
-
-
-int DefaultMQPushConsumer::getConsumeTimeout()
-{
- return m_consumeTimeout;
-}
-
-void DefaultMQPushConsumer::setConsumeTimeout(int consumeTimeout)
-{
- m_consumeTimeout = consumeTimeout;
-}
-
-
-}