You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:52 UTC
[06/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
new file mode 100755
index 0000000..8407350
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/DefaultMQPushConsumer.cpp
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+
+namespace rocketmq {
+
+class AsyncPullCallback : public PullCallback {
+ public:
+ AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
+ : m_callbackOwner(pushConsumer),
+ m_pullRequest(request),
+ m_bShutdown(false) {}
+ virtual ~AsyncPullCallback() {
+ m_callbackOwner = NULL;
+ m_pullRequest = NULL;
+ }
+ virtual void onSuccess(MQMessageQueue& mq, PullResult& result,
+ bool bProducePullRequest) {
+ if (m_bShutdown == true) {
+ LOG_INFO("pullrequest for:%s in shutdown, return",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ return;
+ }
+
+ switch (result.pullStatus) {
+ case FOUND: {
+ if (!m_pullRequest->isDroped()) // if request is setted to dropped,
+ // don't add msgFoundList to
+ // m_msgTreeMap and don't call
+ // producePullMsgTask
+ { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+ // and this request is dropped, and then received pulled msgs.
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ m_pullRequest->putMessage(result.msgFoundList);
+
+ m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+ m_pullRequest, result.msgFoundList);
+
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+ LOG_DEBUG("FOUND:%s with size:%zu,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.msgFoundList.size(), result.nextBeginOffset);
+ }
+ break;
+ }
+ case NO_NEW_MSG: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+ vector<MQMessageExt> msgs;
+ m_pullRequest->getMessage(msgs);
+ if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+
+ LOG_INFO("maybe misMatch between broker and client happens, update
+ consumerOffset to nextBeginOffset indicated by broker");*/
+ m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+ result.nextBeginOffset);
+ }
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+ /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case NO_MATCHED_MSG: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+ vector<MQMessageExt> msgs;
+ m_pullRequest->getMessage(msgs);
+ if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+
+ LOG_INFO("maybe misMatch between broker and client happens, update
+ consumerOffset to nextBeginOffset indicated by broker");*/
+ m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+ result.nextBeginOffset);
+ }
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case OFFSET_ILLEGAL: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+
+ /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
+ // will not returns this status, so this case
+ // could not be entered.
+ LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ break;
+ }
+ }
+ }
+
+ virtual void onException(MQException& e) {
+ if (m_bShutdown == true) {
+ LOG_INFO("pullrequest for:%s in shutdown, return",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ return;
+ }
+ LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ }
+
+ void setShutdownStatus() { m_bShutdown = true; }
+
+ private:
+ DefaultMQPushConsumer* m_callbackOwner;
+ PullRequest* m_pullRequest;
+ bool m_bShutdown;
+};
+
+//<!***************************************************************************
+static boost::mutex m_asyncCallbackLock;
+DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)
+ : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+ m_pOffsetStore(NULL),
+ m_pPullAPIWrapper(NULL),
+ m_pMessageListener(NULL),
+ m_consumeMessageBatchMaxSize(1),
+ m_maxMsgCacheSize(1000) {
+ //<!set default group name;
+ string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+ setGroupName(gname);
+ m_asyncPull = true;
+ m_asyncPullTimeout = 30 * 1000;
+ setMessageModel(CLUSTERING);
+
+ m_startTime = UtilAll::currentTimeMillis();
+ m_consumeThreadCount = boost::thread::hardware_concurrency();
+ m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+ m_async_service_thread.reset(new boost::thread(
+ boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+}
+
+void DefaultMQPushConsumer::boost_asio_work() {
+ LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+ boost::asio::io_service::work work(m_async_ioService); // avoid async io
+ // service stops after
+ // first timer timeout
+ // callback
+ m_async_ioService.run();
+}
+
+DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+ m_pMessageListener = NULL;
+ deleteAndZero(m_pullmsgQueue);
+ deleteAndZero(m_pRebalance);
+ deleteAndZero(m_pOffsetStore);
+ deleteAndZero(m_pPullAPIWrapper);
+ deleteAndZero(m_consumerServeice);
+ PullMAP::iterator it = m_PullCallback.begin();
+ for (; it != m_PullCallback.end(); ++it) {
+ deleteAndZero(it->second);
+ }
+ m_PullCallback.clear();
+ m_subTopics.clear();
+}
+
+void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+ try {
+ getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+ msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ }
+}
+
+void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+ const string& topic, vector<MQMessageQueue>& mqs) {
+ mqs.clear();
+ try {
+ getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+ getSessionCredentials());
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ }
+}
+
+void DefaultMQPushConsumer::doRebalance() {
+ if (isServiceStateOk()) {
+ try {
+ m_pRebalance->doRebalance();
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ }
+ }
+}
+
+void DefaultMQPushConsumer::persistConsumerOffset() {
+ if (isServiceStateOk()) {
+ m_pRebalance->persistConsumerOffset();
+ }
+}
+
+void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+ if (isServiceStateOk()) {
+ m_pRebalance->persistConsumerOffsetByResetOffset();
+ }
+}
+
+void DefaultMQPushConsumer::start() {
+ /* Ignore the SIGPIPE */
+ struct sigaction sa;
+ sa.sa_handler = SIG_IGN;
+ sa.sa_flags = 0;
+ sigaction(SIGPIPE, &sa, 0);
+
+ switch (m_serviceState) {
+ case CREATE_JUST: {
+ m_serviceState = START_FAILED;
+ MQClient::start();
+ LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+
+ //<!data;
+ checkConfig();
+
+ //<!create rebalance;
+ m_pRebalance = new RebalancePush(this, getFactory());
+
+ string groupname = getGroupName();
+ m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+
+ if (m_pMessageListener) {
+ if (m_pMessageListener->getMessageListenerType() ==
+ messageListenerOrderly) {
+ LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+ m_consumerServeice = new ConsumeMessageOrderlyService(
+ this, m_consumeThreadCount, m_pMessageListener);
+ } else // for backward compatible, defaultly and concurrently listeners
+ // are allocating ConsumeMessageConcurrentlyService
+ {
+ LOG_INFO("start concurrently consume service:%s",
+ getGroupName().c_str());
+ m_consumerServeice = new ConsumeMessageConcurrentlyService(
+ this, m_consumeThreadCount, m_pMessageListener);
+ }
+ }
+
+ m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+ m_pullmsgThread.reset(new boost::thread(boost::bind(
+ &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+
+ copySubscription();
+
+ //<! registe;
+ bool registerOK = getFactory()->registerConsumer(this);
+ if (!registerOK) {
+ m_serviceState = CREATE_JUST;
+ THROW_MQEXCEPTION(
+ MQClientException,
+ "The cousumer group[" + getGroupName() +
+ "] has been created before, specify another name please.",
+ -1);
+ }
+
+ //<!msg model;
+ switch (getMessageModel()) {
+ case BROADCASTING:
+ m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+ break;
+ case CLUSTERING:
+ m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+ break;
+ }
+ m_pOffsetStore->load();
+ m_consumerServeice->start();
+
+ getFactory()->start();
+
+ //<!����ط����ʱ��ܳ���;
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ getFactory()->sendHeartbeatToAllBroker();
+
+ m_serviceState = RUNNING;
+ break;
+ }
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+
+ getFactory()->rebalanceImmediately();
+}
+
+void DefaultMQPushConsumer::shutdown() {
+ switch (m_serviceState) {
+ case RUNNING: {
+ LOG_INFO("DefaultMQPushConsumer shutdown");
+ m_async_ioService.stop();
+ m_async_service_thread->interrupt();
+ m_async_service_thread->join();
+ m_pullmsgQueue->close();
+ m_pullmsgThread->interrupt();
+ m_pullmsgThread->join();
+ m_consumerServeice->shutdown();
+ persistConsumerOffset();
+ shutdownAsyncPullCallBack(); // delete aync pullMsg resources
+ getFactory()->unregisterConsumer(this);
+ getFactory()->shutdown();
+ m_serviceState = SHUTDOWN_ALREADY;
+ break;
+ }
+ case CREATE_JUST:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+}
+
+void DefaultMQPushConsumer::registerMessageListener(
+ MQMessageListener* pMessageListener) {
+ if (NULL != pMessageListener) {
+ m_pMessageListener = pMessageListener;
+ }
+}
+
+MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+ if (NULL != m_pMessageListener) {
+ return m_pMessageListener->getMessageListenerType();
+ }
+ return messageListenerDefaultly;
+}
+
+ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {
+ return m_consumerServeice;
+}
+
+OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {
+ return m_pOffsetStore;
+}
+
+Rebalance* DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+
+void DefaultMQPushConsumer::subscribe(const string& topic,
+ const string& subExpression) {
+ m_subTopics[topic] = subExpression;
+}
+
+void DefaultMQPushConsumer::checkConfig() {
+ string groupname = getGroupName();
+ // check consumerGroup
+ Validators::checkGroup(groupname);
+
+ // consumerGroup
+ if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
+ THROW_MQEXCEPTION(MQClientException,
+ "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+ }
+
+ if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+ THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+ }
+
+ if (m_pMessageListener == NULL) {
+ THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+ }
+}
+
+void DefaultMQPushConsumer::copySubscription() {
+ map<string, string>::iterator it = m_subTopics.begin();
+ for (; it != m_subTopics.end(); ++it) {
+ LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+ it->second.c_str());
+ unique_ptr<SubscriptionData> pSData(
+ FilterAPI::buildSubscriptionData(it->first, it->second));
+
+ m_pRebalance->setSubscriptionData(it->first, pSData.release());
+ }
+
+ switch (getMessageModel()) {
+ case BROADCASTING:
+ break;
+ case CLUSTERING: {
+ string retryTopic = UtilAll::getRetryTopic(getGroupName());
+
+ //<!this sub;
+ unique_ptr<SubscriptionData> pSData(
+ FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+
+ m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+ const string& topic, vector<MQMessageQueue>& info) {
+ m_pRebalance->setTopicSubscribeInfo(topic, info);
+}
+
+void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ map<string, SubscriptionData*>& subTable =
+ m_pRebalance->getSubscriptionInner();
+ map<string, SubscriptionData*>::iterator it = subTable.begin();
+ for (; it != subTable.end(); ++it) {
+ bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+ it->first, getSessionCredentials());
+ if (btopic == false) {
+ LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+ }
+ }
+}
+
+ConsumeType DefaultMQPushConsumer::getConsumeType() {
+ return CONSUME_PASSIVELY;
+}
+
+ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+ return m_consumeFromWhere;
+}
+
+void DefaultMQPushConsumer::setConsumeFromWhere(
+ ConsumeFromWhere consumeFromWhere) {
+ m_consumeFromWhere = consumeFromWhere;
+}
+
+void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData>& result) {
+ map<string, SubscriptionData*>& subTable =
+ m_pRebalance->getSubscriptionInner();
+ map<string, SubscriptionData*>::iterator it = subTable.begin();
+ for (; it != subTable.end(); ++it) {
+ result.push_back(*(it->second));
+ }
+}
+
+void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue& mq,
+ int64 offset) {
+ if (offset >= 0) {
+ m_pOffsetStore->updateOffset(mq, offset);
+ } else {
+ LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+ }
+}
+void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
+ m_pOffsetStore->removeOffset(mq);
+}
+
+void DefaultMQPushConsumer::triggerNextPullRequest(
+ boost::asio::deadline_timer* t, PullRequest* request) {
+ // LOG_INFO("trigger pullrequest for:%s",
+ // (request->m_messageQueue).toString().c_str());
+ producePullMsgTask(request);
+ deleteAndZero(t);
+}
+
+void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
+ if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+ if (m_asyncPull) {
+ m_pullmsgQueue->produce(TaskBinder::gen(
+ &DefaultMQPushConsumer::pullMessageAsync, this, request));
+ } else {
+ m_pullmsgQueue->produce(
+ TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+ }
+ }
+}
+
+void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
+ pTaskQueue->run();
+}
+
+void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
+ if (request == NULL || request->isDroped()) {
+ LOG_WARN("Pull request is set drop, return");
+ return;
+ }
+
+ MQMessageQueue& messageQueue = request->m_messageQueue;
+ if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+ messageListenerOrderly) {
+ if (!request->isLocked() || request->isLockExpired()) {
+ if (!m_pRebalance->lock(messageQueue)) {
+ producePullMsgTask(request);
+ return;
+ }
+ }
+ }
+
+ if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+ // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+ // than:%d", (request->m_messageQueue).toString().c_str(),
+ // request->getCacheMsgCount(), m_maxMsgCacheSize);
+ boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
+ m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+ t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+ this, t, request));
+ return;
+ }
+
+ bool commitOffsetEnable = false;
+ int64 commitOffsetValue = 0;
+ if (CLUSTERING == getMessageModel()) {
+ commitOffsetValue = m_pOffsetStore->readOffset(
+ messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+ if (commitOffsetValue > 0) {
+ commitOffsetEnable = true;
+ }
+ }
+
+ string subExpression;
+ SubscriptionData* pSdata =
+ m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+ if (pSdata == NULL) {
+ producePullMsgTask(request);
+ return;
+ }
+ subExpression = pSdata->getSubString();
+
+ int sysFlag =
+ PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
+ false, // suspend
+ !subExpression.empty(), // subscription
+ false); // class filter
+
+ try {
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ unique_ptr<PullResult> result(
+ m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
+ subExpression, // 2
+ pSdata->getSubVersion(), // 3
+ request->getNextOffset(), // 4
+ 32, // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ 1000 * 15, // 8
+ 1000 * 30, // 9
+ ComMode_SYNC, // 10
+ NULL, getSessionCredentials()));
+
+ PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+ messageQueue, result.get(), pSdata);
+
+ switch (pullResult.pullStatus) {
+ case FOUND: {
+ if (!request->isDroped()) // if request is setted to dropped, don't add
+ // msgFoundList to m_msgTreeMap and don't
+ // call producePullMsgTask
+ { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+ // and this request is dropped, and then received pulled msgs.
+ request->setNextOffset(pullResult.nextBeginOffset);
+ request->putMessage(pullResult.msgFoundList);
+
+ m_consumerServeice->submitConsumeRequest(request,
+ pullResult.msgFoundList);
+ producePullMsgTask(request);
+
+ LOG_DEBUG("FOUND:%s with size:%zu,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(),
+ pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+ }
+ break;
+ }
+ case NO_NEW_MSG: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ vector<MQMessageExt> msgs;
+ request->getMessage(msgs);
+ if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+ */
+ // LOG_DEBUG("maybe misMatch between broker and client happens, update
+ // consumerOffset to nextBeginOffset indicated by broker");
+ updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+ }
+ producePullMsgTask(request);
+ LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case NO_MATCHED_MSG: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ vector<MQMessageExt> msgs;
+ request->getMessage(msgs);
+ if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ // LOG_DEBUG("maybe misMatch between broker and client happens, update
+ // consumerOffset to nextBeginOffset indicated by broker");
+ updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+ }
+ producePullMsgTask(request);
+
+ LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case OFFSET_ILLEGAL: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ producePullMsgTask(request);
+
+ LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
+ // will not returns this status, so this case
+ // could not be entered.
+ LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+ request->setNextOffset(pullResult.nextBeginOffset);
+ producePullMsgTask(request);
+ break;
+ }
+ }
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ producePullMsgTask(request);
+ }
+}
+
+AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(
+ PullRequest* request, MQMessageQueue msgQueue) {
+ boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+ if (m_asyncPull && request) {
+ PullMAP::iterator it = m_PullCallback.find(msgQueue);
+ if (it == m_PullCallback.end()) {
+ LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+ m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+ }
+ return m_PullCallback[msgQueue];
+ }
+
+ return NULL;
+}
+
+void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+ boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+ if (m_asyncPull) {
+ PullMAP::iterator it = m_PullCallback.begin();
+ for (; it != m_PullCallback.end(); ++it) {
+ if (it->second) {
+ it->second->setShutdownStatus();
+ } else {
+ LOG_ERROR("could not find asyncPullCallback for:%s",
+ it->first.toString().c_str());
+ }
+ }
+ }
+}
+
+void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
+ if (request == NULL || request->isDroped()) {
+ LOG_WARN("Pull request is set drop with mq:%s, return",
+ (request->m_messageQueue).toString().c_str());
+ return;
+ }
+
+ MQMessageQueue& messageQueue = request->m_messageQueue;
+ if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+ messageListenerOrderly) {
+ if (!request->isLocked() || request->isLockExpired()) {
+ if (!m_pRebalance->lock(messageQueue)) {
+ producePullMsgTask(request);
+ return;
+ }
+ }
+ }
+
+ if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+ // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+ // than:%d", (request->m_messageQueue).toString().c_str(),
+ // request->getCacheMsgCount(), m_maxMsgCacheSize);
+ boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
+ m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+ t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+ this, t, request));
+ return;
+ }
+
+ bool commitOffsetEnable = false;
+ int64 commitOffsetValue = 0;
+ if (CLUSTERING == getMessageModel()) {
+ commitOffsetValue = m_pOffsetStore->readOffset(
+ messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+ if (commitOffsetValue > 0) {
+ commitOffsetEnable = true;
+ }
+ }
+
+ string subExpression;
+ SubscriptionData* pSdata =
+ (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+ if (pSdata == NULL) {
+ producePullMsgTask(request);
+ return;
+ }
+ subExpression = pSdata->getSubString();
+
+ int sysFlag =
+ PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
+ true, // suspend
+ !subExpression.empty(), // subscription
+ false); // class filter
+
+ AsyncArg arg;
+ arg.mq = messageQueue;
+ arg.subData = *pSdata;
+ arg.pPullWrapper = m_pPullAPIWrapper;
+
+ try {
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ m_pPullAPIWrapper->pullKernelImpl(
+ messageQueue, // 1
+ subExpression, // 2
+ pSdata->getSubVersion(), // 3
+ request->getNextOffset(), // 4
+ 32, // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ 1000 * 15, // 8
+ m_asyncPullTimeout, // 9
+ ComMode_ASYNC, // 10
+ getAsyncPullCallBack(request, messageQueue), // 11
+ getSessionCredentials(), // 12
+ &arg); // 13
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ producePullMsgTask(request);
+ }
+}
+
+void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+ if(asyncFlag) {
+ LOG_INFO("set pushConsumer:%s to async default pull mode", getGroupName().c_str());
+ } else {
+ LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+ }
+ m_asyncPull = asyncFlag;
+}
+
+void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+ if (threadCount > 0) {
+ m_consumeThreadCount = threadCount;
+ } else {
+ LOG_ERROR("setConsumeThreadCount with invalid value");
+ }
+}
+
+int DefaultMQPushConsumer::getConsumeThreadCount() const {
+ return m_consumeThreadCount;
+}
+
+void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+ m_pullMsgThreadPoolNum = threadCount;
+}
+
+int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+ return m_pullMsgThreadPoolNum;
+}
+
+int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+ return m_consumeMessageBatchMaxSize;
+}
+
+void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+ int consumeMessageBatchMaxSize) {
+ if (consumeMessageBatchMaxSize >= 1)
+ m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+}
+
+void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+ if (maxCacheSize > 0 && maxCacheSize < 65535) {
+ LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+ getGroupName().c_str());
+ m_maxMsgCacheSize = maxCacheSize;
+ }
+}
+
+int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+ return m_maxMsgCacheSize;
+}
+
+ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
+ ConsumerRunningInfo* info = new ConsumerRunningInfo();
+ if (info) {
+ if(m_consumerServeice->getConsumeMsgSerivceListenerType() == messageListenerOrderly)
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+ else
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+ info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+ UtilAll::to_string(m_startTime));
+
+ vector<SubscriptionData> result;
+ getSubscriptions(result);
+ info->setSubscriptionSet(result);
+
+ map<MQMessageQueue, PullRequest*> requestTable =
+ m_pRebalance->getPullRequestTable();
+ map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
+
+ for (; it != requestTable.end(); ++it) {
+ if (!it->second->isDroped()) {
+ map<MessageQueue, ProcessQueueInfo> queueTable;
+ MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+ (it->first).getQueueId());
+ ProcessQueueInfo processQueue;
+ processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+ processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+ processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+ processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+ it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+ processQueue.setDroped(it->second->isDroped());
+ processQueue.setLocked(it->second->isLocked());
+ processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+ processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+ processQueue.lastConsumeTimestamp =
+ it->second->getLastConsumeTimestamp();
+ info->setMqTable(queue, processQueue);
+ }
+ }
+
+ return info;
+ }
+ return NULL;
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/FindBrokerResult.h b/rocketmq-cpp/src/consumer/FindBrokerResult.h
new file mode 100755
index 0000000..a224b14
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/FindBrokerResult.h
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FINDBROKERRESULT_H__
+#define __FINDBROKERRESULT_H__
+
+namespace rocketmq {
+//<!************************************************************************
+struct FindBrokerResult {
+ FindBrokerResult(const std::string& sbrokerAddr, bool bslave)
+ : brokerAddr(sbrokerAddr), slave(bslave) {}
+
+ public:
+ std::string brokerAddr;
+ bool slave;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/OffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/OffsetStore.cpp b/rocketmq-cpp/src/consumer/OffsetStore.cpp
new file mode 100755
index 0000000..33cf9ed
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/OffsetStore.cpp
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "OffsetStore.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MessageQueue.h"
+
+#include <fstream>
+#include <sstream>
+
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/archive/binary_oarchive.hpp>
+#include <boost/archive/text_iarchive.hpp>
+#include <boost/archive/text_oarchive.hpp>
+#include <boost/serialization/map.hpp>
+
+namespace rocketmq {
+
+//<!***************************************************************************
+OffsetStore::OffsetStore(const string& groupName, MQClientFactory* pfactory)
+ : m_groupName(groupName), m_pClientFactory(pfactory) {}
+
+OffsetStore::~OffsetStore() {
+ m_pClientFactory = NULL;
+ m_offsetTable.clear();
+}
+
+//<!***************************************************************************
+LocalFileOffsetStore::LocalFileOffsetStore(const string& groupName,
+ MQClientFactory* pfactory)
+ : OffsetStore(groupName, pfactory) {
+ MQConsumer* pConsumer = pfactory->selectConsumer(groupName);
+ if (pConsumer) {
+ LOG_INFO("new LocalFileOffsetStore");
+ string directoryName =
+ UtilAll::getLocalAddress() + "@" + pConsumer->getInstanceName();
+ m_storePath = ".rocketmq_offsets/" + directoryName + "/" + groupName + "/";
+ string homeDir(UtilAll::getHomeDirectory());
+ m_storeFile = homeDir + "/" + m_storePath + "offsets.Json";
+
+ string storePath(homeDir);
+ storePath.append(m_storePath);
+ if (access(storePath.c_str(), F_OK) != 0) {
+ if (mkdir(storePath.c_str(), S_IRWXU | S_IRWXG | S_IRWXO) != 0) {
+ LOG_ERROR("create data dir:%s error", storePath.c_str());
+ }
+ }
+ }
+}
+
+LocalFileOffsetStore::~LocalFileOffsetStore() {}
+
+void LocalFileOffsetStore::load() {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+
+ std::ifstream ifs(m_storeFile.c_str(), std::ios::in);
+ if (ifs.good()) {
+ if (ifs.is_open()) {
+ if (ifs.peek() != std::ifstream::traits_type::eof()) {
+ map<string, int64> m_offsetTable_tmp;
+ boost::archive::text_iarchive ia(ifs);
+ ia >> m_offsetTable_tmp;
+ ifs.close();
+
+ for (map<string, int64>::iterator it = m_offsetTable_tmp.begin();
+ it != m_offsetTable_tmp.end(); ++it) {
+ // LOG_INFO("it->first:%s, it->second:%lld", it->first.c_str(),
+ // it->second);
+ Json::Reader reader;
+ Json::Value object;
+ reader.parse(it->first.c_str(), object);
+ MQMessageQueue mq(object["topic"].asString(),
+ object["brokerName"].asString(),
+ object["queueId"].asInt());
+ m_offsetTable[mq] = it->second;
+ }
+ m_offsetTable_tmp.clear();
+ /*for(map<MQMessageQueue, int64>::iterator it2 = m_offsetTable.begin();
+ it2!=m_offsetTable.end();++it2 ){
+ LOG_INFO("it->first:%s, it->second:%lld",
+ it2->first.toString().c_str(), it2->second);
+ }*/
+ } else {
+ LOG_ERROR(
+ "open offset store file failed, please check whether file: %s is "
+ "cleared by operator, if so, delete this offsets.Json file and "
+ "then restart consumer",
+ m_storeFile.c_str());
+ THROW_MQEXCEPTION(MQClientException,
+ "open offset store file failed, please check whether "
+ "offsets.Json is cleared by operator, if so, delete "
+ "this offsets.Json file and then restart consumer",
+ -1);
+ }
+ } else {
+ LOG_ERROR(
+ "open offset store file failed, please check whether file:%s is "
+ "deleted by operator and then restart consumer",
+ m_storeFile.c_str());
+ THROW_MQEXCEPTION(MQClientException,
+ "open offset store file failed, please check "
+ "directory:%s is deleted by operator or offset.Json "
+ "file is cleared by operator, and then restart "
+ "consumer",
+ -1);
+ }
+ } else {
+ LOG_WARN(
+ "offsets.Json file not exist, maybe this is the first time "
+ "consumation");
+ }
+}
+
+void LocalFileOffsetStore::updateOffset(const MQMessageQueue& mq,
+ int64 offset) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ m_offsetTable[mq] = offset;
+}
+
+int64 LocalFileOffsetStore::readOffset(
+ const MQMessageQueue& mq, ReadOffsetType type,
+ const SessionCredentials& session_credentials) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+
+ switch (type) {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY: {
+ MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+ if (it != m_offsetTable.end()) {
+ return it->second;
+ } else if (READ_FROM_MEMORY == type) {
+ return -1;
+ }
+ }
+ case READ_FROM_STORE: {
+ try {
+ load();
+ } catch (MQException& e) {
+ LOG_ERROR("catch exception when load local file");
+ return -1;
+ }
+ MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+ if (it != m_offsetTable.end()) {
+ return it->second;
+ }
+ }
+ default:
+ break;
+ }
+ LOG_ERROR(
+ "can not readOffset from offsetStore.json, maybe first time consumation");
+ return -1;
+}
+
+void LocalFileOffsetStore::persist(
+ const MQMessageQueue& mq, const SessionCredentials& session_credentials) {}
+
+void LocalFileOffsetStore::persistAll(const std::vector<MQMessageQueue>& mqs) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+
+ map<string, int64> m_offsetTable_tmp;
+ vector<MQMessageQueue>::const_iterator it = mqs.begin();
+ for (; it != mqs.end(); ++it) {
+ MessageQueue mq_tmp((*it).getTopic(), (*it).getBrokerName(),
+ (*it).getQueueId());
+ string mqKey = mq_tmp.toJson().toStyledString();
+ m_offsetTable_tmp[mqKey] = m_offsetTable[*it];
+ }
+
+ std::ofstream s;
+ s.open(m_storeFile.c_str(), std::ios::out);
+ if (s.is_open()) {
+ boost::archive::text_oarchive oa(s);
+ // Boost is nervous that archiving non-const class instances which might
+ // cause a problem with object tracking if different tracked objects use the
+ // same address.
+ oa << const_cast<const map<string, int64>&>(m_offsetTable_tmp);
+ s.close();
+ m_offsetTable_tmp.clear();
+ } else {
+ LOG_ERROR("open offset store file failed");
+ m_offsetTable_tmp.clear();
+ THROW_MQEXCEPTION(MQClientException,
+ "persistAll:open offset store file failed", -1);
+ }
+}
+
+void LocalFileOffsetStore::removeOffset(const MQMessageQueue& mq) {}
+
+//<!***************************************************************************
+RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(const string& groupName,
+ MQClientFactory* pfactory)
+ : OffsetStore(groupName, pfactory) {}
+
+RemoteBrokerOffsetStore::~RemoteBrokerOffsetStore() {}
+
+void RemoteBrokerOffsetStore::load() {}
+
+void RemoteBrokerOffsetStore::updateOffset(const MQMessageQueue& mq,
+ int64 offset) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ m_offsetTable[mq] = offset;
+}
+
+int64 RemoteBrokerOffsetStore::readOffset(
+ const MQMessageQueue& mq, ReadOffsetType type,
+ const SessionCredentials& session_credentials) {
+ switch (type) {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY: {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+
+ MQ2OFFSET::iterator it = m_offsetTable.find(mq);
+ if (it != m_offsetTable.end()) {
+ return it->second;
+ } else if (READ_FROM_MEMORY == type) {
+ return -1;
+ }
+ }
+ case READ_FROM_STORE: {
+ try {
+ int64 brokerOffset =
+ fetchConsumeOffsetFromBroker(mq, session_credentials);
+ //<!update;
+ updateOffset(mq, brokerOffset);
+ return brokerOffset;
+ } catch (MQBrokerException& e) {
+ LOG_ERROR(e.what());
+ return -1;
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ return -2;
+ }
+ }
+ default:
+ break;
+ }
+ return -1;
+}
+
+void RemoteBrokerOffsetStore::persist(
+ const MQMessageQueue& mq, const SessionCredentials& session_credentials) {
+ MQ2OFFSET offsetTable;
+ {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ offsetTable = m_offsetTable;
+ }
+
+ MQ2OFFSET::iterator it = offsetTable.find(mq);
+ if (it != offsetTable.end()) {
+ try {
+ updateConsumeOffsetToBroker(mq, it->second, session_credentials);
+ } catch (MQException& e) {
+ LOG_ERROR("updateConsumeOffsetToBroker error");
+ }
+ }
+}
+
+void RemoteBrokerOffsetStore::persistAll(
+ const std::vector<MQMessageQueue>& mq) {}
+
+void RemoteBrokerOffsetStore::removeOffset(const MQMessageQueue& mq) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ if (m_offsetTable.find(mq) != m_offsetTable.end()) m_offsetTable.erase(mq);
+}
+
+void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(
+ const MQMessageQueue& mq, int64 offset,
+ const SessionCredentials& session_credentials) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+
+ if (pFindBrokerResult == NULL) {
+ m_pClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+ session_credentials);
+ pFindBrokerResult.reset(
+ m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+ }
+
+ if (pFindBrokerResult != NULL) {
+ UpdateConsumerOffsetRequestHeader* pRequestHeader =
+ new UpdateConsumerOffsetRequestHeader();
+ pRequestHeader->topic = mq.getTopic();
+ pRequestHeader->consumerGroup = m_groupName;
+ pRequestHeader->queueId = mq.getQueueId();
+ pRequestHeader->commitOffset = offset;
+
+ try {
+ LOG_INFO(
+ "oneway updateConsumeOffsetToBroker of mq:%s, its offset is:%lld",
+ mq.toString().c_str(), offset);
+ return m_pClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
+ pFindBrokerResult->brokerAddr, pRequestHeader, 1000 * 5,
+ session_credentials);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ }
+ }
+ LOG_WARN("The broker not exist");
+}
+
+int64 RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(
+ const MQMessageQueue& mq, const SessionCredentials& session_credentials) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+
+ if (pFindBrokerResult == NULL) {
+ m_pClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+ session_credentials);
+ pFindBrokerResult.reset(
+ m_pClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()));
+ }
+
+ if (pFindBrokerResult != NULL) {
+ QueryConsumerOffsetRequestHeader* pRequestHeader =
+ new QueryConsumerOffsetRequestHeader();
+ pRequestHeader->topic = mq.getTopic();
+ pRequestHeader->consumerGroup = m_groupName;
+ pRequestHeader->queueId = mq.getQueueId();
+
+ return m_pClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
+ pFindBrokerResult->brokerAddr, pRequestHeader, 1000 * 5,
+ session_credentials);
+ } else {
+ LOG_ERROR("The broker not exist when fetchConsumeOffsetFromBroker");
+ THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
+ }
+}
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/OffsetStore.h b/rocketmq-cpp/src/consumer/OffsetStore.h
new file mode 100755
index 0000000..269198f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/OffsetStore.h
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __OFFSETSTORE_H__
+#define __OFFSETSTORE_H__
+
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <map>
+#include "MQMessageQueue.h"
+#include "RocketMQClient.h"
+#include "SessionCredentials.h"
+
+namespace rocketmq {
+class MQClientFactory;
+//<!***************************************************************************
+enum ReadOffsetType {
+ //read offset from memory
+ READ_FROM_MEMORY,
+ //read offset from remoting
+ READ_FROM_STORE,
+ //read offset from memory firstly, then from remoting
+ MEMORY_FIRST_THEN_STORE,
+};
+
+//<!***************************************************************************
+class OffsetStore {
+ public:
+ OffsetStore(const std::string& groupName, MQClientFactory*);
+ virtual ~OffsetStore();
+
+ virtual void load() = 0;
+ virtual void updateOffset(const MQMessageQueue& mq, int64 offset) = 0;
+ virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+ const SessionCredentials& session_credentials) = 0;
+ virtual void persist(const MQMessageQueue& mq,
+ const SessionCredentials& session_credentials) = 0;
+ virtual void persistAll(const std::vector<MQMessageQueue>& mq) = 0;
+ virtual void removeOffset(const MQMessageQueue& mq) = 0;
+
+ protected:
+ std::string m_groupName;
+ typedef std::map<MQMessageQueue, int64> MQ2OFFSET;
+ MQ2OFFSET m_offsetTable;
+ MQClientFactory* m_pClientFactory;
+ boost::mutex m_lock;
+};
+
+//<!***************************************************************************
+class LocalFileOffsetStore : public OffsetStore {
+ public:
+ LocalFileOffsetStore(const std::string& groupName, MQClientFactory*);
+ virtual ~LocalFileOffsetStore();
+
+ virtual void load();
+ virtual void updateOffset(const MQMessageQueue& mq, int64 offset);
+ virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+ const SessionCredentials& session_credentials);
+ virtual void persist(const MQMessageQueue& mq,
+ const SessionCredentials& session_credentials);
+ virtual void persistAll(const std::vector<MQMessageQueue>& mq);
+ virtual void removeOffset(const MQMessageQueue& mq);
+
+ private:
+ std::string m_storePath;
+ std::string m_storeFile;
+};
+
+//<!***************************************************************************
+class RemoteBrokerOffsetStore : public OffsetStore {
+ public:
+ RemoteBrokerOffsetStore(const std::string& groupName, MQClientFactory*);
+ virtual ~RemoteBrokerOffsetStore();
+
+ virtual void load();
+ virtual void updateOffset(const MQMessageQueue& mq, int64 offset);
+ virtual int64 readOffset(const MQMessageQueue& mq, ReadOffsetType type,
+ const SessionCredentials& session_credentials);
+ virtual void persist(const MQMessageQueue& mq,
+ const SessionCredentials& session_credentials);
+ virtual void persistAll(const std::vector<MQMessageQueue>& mq);
+ virtual void removeOffset(const MQMessageQueue& mq);
+
+ private:
+ void updateConsumeOffsetToBroker(
+ const MQMessageQueue& mq, int64 offset,
+ const SessionCredentials& session_credentials);
+ int64 fetchConsumeOffsetFromBroker(
+ const MQMessageQueue& mq, const SessionCredentials& session_credentials);
+};
+//<!***************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp b/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
new file mode 100755
index 0000000..6a4b507
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullAPIWrapper.cpp
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullAPIWrapper.h"
+#include "CommunicationMode.h"
+#include "MQClientFactory.h"
+#include "PullResultExt.h"
+#include "PullSysFlag.h"
+namespace rocketmq {
+//<!************************************************************************
+PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory,
+ const string& consumerGroup) {
+ m_MQClientFactory = mQClientFactory;
+ m_consumerGroup = consumerGroup;
+}
+
+PullAPIWrapper::~PullAPIWrapper() {
+ m_MQClientFactory = NULL;
+ m_pullFromWhichNodeTable.clear();
+}
+
+void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq,
+ int brokerId) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ m_pullFromWhichNodeTable[mq] = brokerId;
+}
+
+int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) {
+ return m_pullFromWhichNodeTable[mq];
+ }
+ return MASTER_ID;
+}
+
+PullResult PullAPIWrapper::processPullResult(
+ const MQMessageQueue& mq, PullResult* pullResult,
+ SubscriptionData* subscriptionData) {
+ PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult);
+ if (pResultExt == NULL) {
+ string errMsg("The pullResult NULL of");
+ errMsg.append(mq.toString());
+ THROW_MQEXCEPTION(MQClientException, errMsg, -1);
+ }
+
+ //<!update;
+ updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId);
+
+ vector<MQMessageExt> msgFilterList;
+ if (pResultExt->pullStatus == FOUND) {
+ //<!decode all msg list;
+ vector<MQMessageExt> msgAllList;
+ MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList);
+
+ //<!filter msg list again;
+ if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) {
+ msgFilterList.reserve(msgAllList.size());
+ vector<MQMessageExt>::iterator it = msgAllList.begin();
+ for (; it != msgAllList.end(); ++it) {
+ string msgTag = (*it).getTags();
+ if (subscriptionData->containTag(msgTag)) {
+ msgFilterList.push_back(*it);
+ }
+ }
+ } else
+ {
+ msgFilterList.swap(msgAllList);
+ }
+ }
+
+ return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset,
+ pResultExt->minOffset, pResultExt->maxOffset,
+ msgFilterList);
+}
+
+PullResult* PullAPIWrapper::pullKernelImpl(
+ const MQMessageQueue& mq, // 1
+ string subExpression, // 2
+ int64 subVersion, // 3
+ int64 offset, // 4
+ int maxNums, // 5
+ int sysFlag, // 6
+ int64 commitOffset, // 7
+ int brokerSuspendMaxTimeMillis, // 8
+ int timeoutMillis, // 9
+ int communicationMode, // 10
+ PullCallback* pullCallback, const SessionCredentials& session_credentials,
+ void* pArg /*= NULL*/) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_MQClientFactory->findBrokerAddressInSubscribe(
+ mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
+ //<!goto nameserver;
+ if (pFindBrokerResult == NULL) {
+ m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
+ session_credentials);
+ pFindBrokerResult.reset(m_MQClientFactory->findBrokerAddressInSubscribe(
+ mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
+ }
+
+ if (pFindBrokerResult != NULL) {
+ int sysFlagInner = sysFlag;
+
+ if (pFindBrokerResult->slave) {
+ sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
+ }
+
+ PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader();
+ pRequestHeader->consumerGroup = m_consumerGroup;
+ pRequestHeader->topic = mq.getTopic();
+ pRequestHeader->queueId = mq.getQueueId();
+ pRequestHeader->queueOffset = offset;
+ pRequestHeader->maxMsgNums = maxNums;
+ pRequestHeader->sysFlag = sysFlagInner;
+ pRequestHeader->commitOffset = commitOffset;
+ pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
+ pRequestHeader->subscription = subExpression;
+ pRequestHeader->subVersion = subVersion;
+
+ return m_MQClientFactory->getMQClientAPIImpl()->pullMessage(
+ pFindBrokerResult->brokerAddr, pRequestHeader, timeoutMillis,
+ communicationMode, pullCallback, pArg, session_credentials);
+ }
+ THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
+}
+
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullAPIWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullAPIWrapper.h b/rocketmq-cpp/src/consumer/PullAPIWrapper.h
new file mode 100755
index 0000000..e3d0a1e
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullAPIWrapper.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _PULLAPIWRAPPER_H_
+#define _PULLAPIWRAPPER_H_
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "AsyncCallback.h"
+#include "MQMessageQueue.h"
+#include "SessionCredentials.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+class MQClientFactory;
+//<!***************************************************************************
+class PullAPIWrapper {
+ public:
+ PullAPIWrapper(MQClientFactory* mQClientFactory, const string& consumerGroup);
+ ~PullAPIWrapper();
+
+ PullResult processPullResult(const MQMessageQueue& mq, PullResult* pullResult,
+ SubscriptionData* subscriptionData);
+
+ PullResult* pullKernelImpl(const MQMessageQueue& mq, // 1
+ string subExpression, // 2
+ int64 subVersion, // 3
+ int64 offset, // 4
+ int maxNums, // 5
+ int sysFlag, // 6
+ int64 commitOffset, // 7
+ int brokerSuspendMaxTimeMillis, // 8
+ int timeoutMillis, // 9
+ int communicationMode, // 10
+ PullCallback* pullCallback,
+ const SessionCredentials& session_credentials,
+ void* pArg = NULL);
+
+ private:
+ void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
+
+ int recalculatePullFromWhichNode(const MQMessageQueue& mq);
+
+ private:
+ MQClientFactory* m_MQClientFactory;
+ string m_consumerGroup;
+ boost::mutex m_lock;
+ map<MQMessageQueue, int /* brokerId */> m_pullFromWhichNodeTable;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+
+#endif //<! _PULLAPIWRAPPER_H_
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullRequest.cpp b/rocketmq-cpp/src/consumer/PullRequest.cpp
new file mode 100755
index 0000000..d9b953f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullRequest.cpp
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullRequest.h"
+#include "Logging.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
+const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
+
+PullRequest::PullRequest(const string& groupname)
+ : m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), m_bDroped(false), m_bLocked(false) {}
+
+PullRequest::~PullRequest() {
+ m_msgTreeMapTemp.clear();
+ m_msgTreeMap.clear();
+}
+
+PullRequest& PullRequest::operator=(const PullRequest& other) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ if (this != &other) {
+ m_groupname = other.m_groupname;
+ m_nextOffset = other.m_nextOffset;
+ m_bDroped.store(other.m_bDroped.load());
+ m_queueOffsetMax = other.m_queueOffsetMax;
+ m_messageQueue = other.m_messageQueue;
+ m_msgTreeMap = other.m_msgTreeMap;
+ m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+ }
+ return *this;
+}
+
+void PullRequest::putMessage(vector<MQMessageExt>& msgs) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+ vector<MQMessageExt>::iterator it = msgs.begin();
+ for (; it != msgs.end(); it++) {
+ m_msgTreeMap[it->getQueueOffset()] = *it;
+ m_queueOffsetMax = std::max(m_queueOffsetMax, it->getQueueOffset());
+ }
+ LOG_DEBUG("PullRequest: putMessage m_queueOffsetMax:%lld ", m_queueOffsetMax);
+}
+
+void PullRequest::getMessage(vector<MQMessageExt>& msgs) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+ map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+ for (; it != m_msgTreeMap.end(); it++) {
+ msgs.push_back(it->second);
+ }
+}
+
+int64 PullRequest::getCacheMinOffset() {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ if (m_msgTreeMap.empty()) {
+ return 0;
+ } else {
+ map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+ MQMessageExt msg = it->second;
+ return msg.getQueueOffset();
+ }
+}
+
+int64 PullRequest::getCacheMaxOffset() { return m_queueOffsetMax; }
+
+int PullRequest::getCacheMsgCount() {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ return m_msgTreeMap.size();
+}
+
+void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs,
+ int64 minQueueOffset,
+ int64 maxQueueOffset) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+ int64 it = minQueueOffset;
+ for (; it <= maxQueueOffset; it++) {
+ msgs.push_back(m_msgTreeMap[it]);
+ }
+}
+
+int64 PullRequest::removeMessage(vector<MQMessageExt>& msgs) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+ int64 result = -1;
+ LOG_DEBUG("m_queueOffsetMax is:%lld", m_queueOffsetMax);
+ if (!m_msgTreeMap.empty()) {
+ result = m_queueOffsetMax + 1;
+ LOG_DEBUG(" offset result is:%lld, m_queueOffsetMax is:%lld, msgs size:%zu",
+ result, m_queueOffsetMax, msgs.size());
+ vector<MQMessageExt>::iterator it = msgs.begin();
+ for (; it != msgs.end(); it++) {
+ LOG_DEBUG("remove these msg from m_msgTreeMap, its offset:%lld",
+ it->getQueueOffset());
+ m_msgTreeMap.erase(it->getQueueOffset());
+ }
+
+ if (!m_msgTreeMap.empty()) {
+ map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+ result = it->first;
+ LOG_INFO("cache msg size:%zu of pullRequest:%s, return offset result is:%lld",
+ m_msgTreeMap.size(), m_messageQueue.toString().c_str(), result);
+ }
+ }
+
+ return result;
+}
+
+void PullRequest::clearAllMsgs() {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+
+ if (isDroped()) {
+ LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
+ m_msgTreeMap.clear();
+ m_msgTreeMapTemp.clear();
+ }
+}
+
+void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
+ // following 2 cases which may set queueOffset smaller than m_queueOffsetMax:
+ // 1. resetOffset cmd
+ // 2. during rebalance, if configured with CONSUMER_FROM_FIRST_OFFSET, when
+ // readOffset called by computePullFromWhere was failed, m_nextOffset will be
+ // setted to 0
+ m_queueOffsetMax = queueOffset;
+}
+
+void PullRequest::setDroped(bool droped) {
+ int temp = (droped == true ? 1 : 0);
+ m_bDroped.store(temp);
+ /*
+ m_queueOffsetMax = 0;
+ m_nextOffset = 0;
+ //the reason why not clear m_queueOffsetMax and m_nextOffset is due to
+ ConsumeMsgService and drop mq are concurrent running.
+ consider following situation:
+ 1>. ConsumeMsgService running
+ 2>. dorebalance, drop mq, reset m_nextOffset and m_queueOffsetMax
+ 3>. ConsumeMsgService calls removeMessages, if no other msgs in
+ m_msgTreeMap, m_queueOffsetMax(0)+1 will return;
+ 4>. updateOffset with 1, which is more smaller than correct offset.
+ */
+}
+
+bool PullRequest::isDroped() const { return m_bDroped.load() == 1; }
+
+int64 PullRequest::getNextOffset() {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ return m_nextOffset;
+}
+
+void PullRequest::setLocked(bool Locked) {
+ int temp = (Locked == true ? 1 : 0);
+ m_bLocked.store(temp);
+}
+bool PullRequest::isLocked() const { return m_bLocked.load() == 1; }
+
+bool PullRequest::isLockExpired() const {
+ return (UtilAll::currentTimeMillis() - m_lastLockTimestamp) >
+ RebalanceLockMaxLiveTime;
+}
+
+void PullRequest::setLastLockTimestamp(int64 time) {
+ m_lastLockTimestamp = time;
+}
+
+int64 PullRequest::getLastLockTimestamp() const { return m_lastLockTimestamp; }
+
+void PullRequest::setLastPullTimestamp(uint64 time) {
+ m_lastPullTimestamp = time;
+}
+
+uint64 PullRequest::getLastPullTimestamp() const { return m_lastPullTimestamp; }
+
+void PullRequest::setLastConsumeTimestamp(uint64 time) {
+ m_lastConsumeTimestamp = time;
+}
+
+uint64 PullRequest::getLastConsumeTimestamp() const {
+ return m_lastConsumeTimestamp;
+}
+
+void PullRequest::setTryUnlockTimes(int time) { m_lastLockTimestamp = time; }
+
+int PullRequest::getTryUnlockTimes() const { return m_lastLockTimestamp; }
+
+void PullRequest::setNextOffset(int64 nextoffset) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ m_nextOffset = nextoffset;
+}
+
+string PullRequest::getGroupName() const { return m_groupname; }
+
+boost::timed_mutex& PullRequest::getPullRequestCriticalSection() {
+ return m_consumeLock;
+}
+
+void PullRequest::takeMessages(vector<MQMessageExt>& msgs, int batchSize) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ for (int i = 0; i != batchSize; i++) {
+ map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
+ if (it != m_msgTreeMap.end()) {
+ msgs.push_back(it->second);
+ m_msgTreeMapTemp[it->first] = it->second;
+ m_msgTreeMap.erase(it);
+ }
+ }
+}
+
+void PullRequest::makeMessageToCosumeAgain(vector<MQMessageExt>& msgs) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ for (unsigned int it = 0; it != msgs.size(); ++it) {
+ m_msgTreeMap[msgs[it].getQueueOffset()] = msgs[it];
+ m_msgTreeMapTemp.erase(msgs[it].getQueueOffset());
+ }
+}
+
+int64 PullRequest::commit() {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ if (!m_msgTreeMapTemp.empty()) {
+ int64 offset = (--m_msgTreeMapTemp.end())->first;
+ m_msgTreeMapTemp.clear();
+ return offset + 1;
+ } else {
+ return -1;
+ }
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullRequest.h b/rocketmq-cpp/src/consumer/PullRequest.h
new file mode 100755
index 0000000..6cd2180
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullRequest.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PULLREQUEST_H__
+#define __PULLREQUEST_H__
+
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "ByteOrder.h"
+#include "MQMessageExt.h"
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class PullRequest {
+ public:
+ PullRequest(const string& groupname);
+ virtual ~PullRequest();
+
+ void putMessage(vector<MQMessageExt>& msgs);
+ void getMessage(vector<MQMessageExt>& msgs);
+ int64 getCacheMinOffset();
+ int64 getCacheMaxOffset();
+ int getCacheMsgCount();
+ void getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset,
+ int64 maxQueueOffset);
+ int64 removeMessage(vector<MQMessageExt>& msgs);
+ void clearAllMsgs();
+
+ PullRequest& operator=(const PullRequest& other);
+
+ void setDroped(bool droped);
+ bool isDroped() const;
+
+ int64 getNextOffset();
+ void setNextOffset(int64 nextoffset);
+
+ string getGroupName() const;
+
+ void updateQueueMaxOffset(int64 queueOffset);
+
+ void setLocked(bool Locked);
+ bool isLocked() const;
+ bool isLockExpired() const;
+ void setLastLockTimestamp(int64 time);
+ int64 getLastLockTimestamp() const;
+ void setLastPullTimestamp(uint64 time);
+ uint64 getLastPullTimestamp() const;
+ void setLastConsumeTimestamp(uint64 time);
+ uint64 getLastConsumeTimestamp() const;
+ void setTryUnlockTimes(int time);
+ int getTryUnlockTimes() const;
+ void takeMessages(vector<MQMessageExt>& msgs, int batchSize);
+ int64 commit();
+ void makeMessageToCosumeAgain(vector<MQMessageExt>& msgs);
+ boost::timed_mutex& getPullRequestCriticalSection();
+
+ public:
+ MQMessageQueue m_messageQueue;
+ static const uint64 RebalanceLockInterval; // ms
+ static const uint64 RebalanceLockMaxLiveTime; // ms
+
+ private:
+ string m_groupname;
+ int64 m_nextOffset;
+ int64 m_queueOffsetMax;
+ boost::atomic<bool> m_bDroped;
+ boost::atomic<bool> m_bLocked;
+ map<int64, MQMessageExt> m_msgTreeMap;
+ map<int64, MQMessageExt> m_msgTreeMapTemp;
+ boost::mutex m_pullRequestLock;
+ uint64 m_lastLockTimestamp; // ms
+ uint64 m_tryUnlockTimes;
+ uint64 m_lastPullTimestamp;
+ uint64 m_lastConsumeTimestamp;
+ boost::timed_mutex m_consumeLock;
+};
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullResult.cpp b/rocketmq-cpp/src/consumer/PullResult.cpp
new file mode 100755
index 0000000..8648abe
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullResult.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullResult.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+PullResult::PullResult()
+ : pullStatus(NO_MATCHED_MSG),
+ nextBeginOffset(0),
+ minOffset(0),
+ maxOffset(0) {}
+
+PullResult::PullResult(PullStatus status)
+ : pullStatus(status), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
+
+PullResult::PullResult(PullStatus pullStatus, int64 nextBeginOffset,
+ int64 minOffset, int64 maxOffset)
+ : pullStatus(pullStatus),
+ nextBeginOffset(nextBeginOffset),
+ minOffset(minOffset),
+ maxOffset(maxOffset) {}
+
+PullResult::PullResult(PullStatus pullStatus, int64 nextBeginOffset,
+ int64 minOffset, int64 maxOffset,
+ const vector<MQMessageExt>& src)
+ : pullStatus(pullStatus),
+ nextBeginOffset(nextBeginOffset),
+ minOffset(minOffset),
+ maxOffset(maxOffset) {
+ msgFoundList.reserve(src.size());
+ for (size_t i = 0; i < src.size(); i++) {
+ msgFoundList.push_back(src[i]);
+ }
+}
+
+PullResult::~PullResult() { msgFoundList.clear(); }
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/PullResultExt.h b/rocketmq-cpp/src/consumer/PullResultExt.h
new file mode 100755
index 0000000..ac6b8e9
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/PullResultExt.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullResult.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+
+namespace rocketmq {
+/**
+ * ֻ���ڲ�ʹ�ã��������
+ */
+//<!***************************************************************************
+class PullResultExt : public PullResult {
+ public:
+ PullResultExt(PullStatus pullStatus, int64 nextBeginOffset, int64 minOffset,
+ int64 maxOffset, int suggestWhichBrokerId,
+ const MemoryBlock& messageBinary)
+ : PullResult(pullStatus, nextBeginOffset, minOffset, maxOffset),
+ suggestWhichBrokerId(suggestWhichBrokerId),
+ msgMemBlock(messageBinary) {}
+ PullResultExt(PullStatus pullStatus, int64 nextBeginOffset, int64 minOffset,
+ int64 maxOffset, int suggestWhichBrokerId)
+ : PullResult(pullStatus, nextBeginOffset, minOffset, maxOffset),
+ suggestWhichBrokerId(suggestWhichBrokerId) {}
+ virtual ~PullResultExt() {}
+
+ public:
+ int suggestWhichBrokerId;
+ MemoryBlock msgMemBlock;
+};
+
+} //<!end namespace;