You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:51 UTC
[05/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/Rebalance.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/Rebalance.cpp b/rocketmq-cpp/src/consumer/Rebalance.cpp
new file mode 100755
index 0000000..a19e7a7
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/Rebalance.cpp
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Rebalance.h"
+#include "DefaultMQPushConsumer.h"
+#include "LockBatchBody.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "OffsetStore.h"
+
+namespace rocketmq {
+//<!************************************************************************
+Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory* pfactory)
+ : m_pConsumer(consumer), m_pClientFactory(pfactory) {
+ m_pAllocateMQStrategy = new AllocateMQAveragely();
+}
+
+Rebalance::~Rebalance() {
+ {
+ map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
+ for (; it != m_subscriptionData.end(); ++it) deleteAndZero(it->second);
+ m_subscriptionData.clear();
+ }
+ {
+ MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+ for (; it != m_requestQueueTable.end(); ++it) {
+ delete it->second;
+ it->second = NULL;
+ }
+ m_requestQueueTable.clear();
+ }
+ m_topicSubscribeInfoTable.clear();
+ m_pConsumer = NULL;
+ m_pClientFactory = NULL;
+ deleteAndZero(m_pAllocateMQStrategy);
+}
+
+void Rebalance::doRebalance() {
+ LOG_DEBUG("start doRebalance");
+ try {
+ map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
+ for (; it != m_subscriptionData.end(); ++it) {
+ string topic = (it->first);
+ LOG_INFO("current topic is:%s", topic.c_str());
+ //<!topic -> mqs
+ vector<MQMessageQueue> mqAll;
+ if (!getTopicSubscribeInfo(topic, mqAll)) {
+ continue;
+ }
+ if (mqAll.empty()) {
+ if (!UtilAll::startsWith_retry(topic))
+ THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is empty",
+ -1);
+ }
+
+ //<!msg model;
+ switch (m_pConsumer->getMessageModel()) {
+ case BROADCASTING: {
+ bool changed = updateRequestTableInRebalance(topic, mqAll);
+ if (changed) {
+ messageQueueChanged(topic, mqAll, mqAll);
+ }
+ break;
+ }
+ case CLUSTERING: {
+ vector<string> cidAll;
+ m_pClientFactory->findConsumerIds(
+ topic, m_pConsumer->getGroupName(), cidAll,
+ m_pConsumer->getSessionCredentials());
+
+ if (cidAll.empty()) {
+ /*remove the droping pullRequest changes for recovery consume fastly
+ from network broken
+ //drop all pullRequest
+ MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+ for (; it != m_requestQueueTable.end(); ++it)
+ {
+ if(!(it->second->isDroped()))
+ {
+ MQMessageQueue mqtemp = it->first;
+ it->second->setDroped(true);
+ removeUnnecessaryMessageQueue(mqtemp);
+ it->second->clearAllMsgs();//add clear operation to
+ avoid bad
+ state when dropped pullRequest returns normal
+ LOG_INFO("find consumer failed, drop undropped mq:%s",
+ mqtemp.toString().c_str());
+ }
+ }*/
+
+ THROW_MQEXCEPTION(MQClientException,
+ "doRebalance the cidAll is empty", -1);
+ }
+ // log
+ for (int i = 0; i < (int)cidAll.size(); ++i) {
+ LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(),
+ topic.c_str());
+ }
+ //<! sort;
+ sort(mqAll.begin(), mqAll.end());
+ sort(cidAll.begin(), cidAll.end());
+
+ //<! allocate;
+ vector<MQMessageQueue> allocateResult;
+ try {
+ m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll,
+ cidAll, allocateResult);
+ } catch (MQException& e) {
+ THROW_MQEXCEPTION(MQClientException, "allocate error", -1);
+ }
+
+ // log
+ for (int i = 0; i < (int)allocateResult.size(); ++i) {
+ LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str());
+ }
+
+ //<!update local;
+ bool changed = updateRequestTableInRebalance(topic, allocateResult);
+ if (changed) {
+ messageQueueChanged(topic, mqAll, allocateResult);
+ break;
+ }
+ }
+ default:
+ break;
+ }
+ }
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ }
+}
+
+void Rebalance::persistConsumerOffset() {
+ DefaultMQPushConsumer* pConsumer =
+ static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+ vector<MQMessageQueue> mqs;
+ {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+ for (; it != m_requestQueueTable.end(); ++it) {
+ if (it->second && (!it->second->isDroped())) {
+ mqs.push_back(it->first);
+ }
+ }
+ }
+
+ if (pConsumer->getMessageModel() == BROADCASTING) {
+ pOffsetStore->persistAll(mqs);
+ } else {
+ vector<MQMessageQueue>::iterator it2 = mqs.begin();
+ for (; it2 != mqs.end(); ++it2) {
+ pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
+ }
+ }
+}
+
+void Rebalance::persistConsumerOffsetByResetOffset() {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ DefaultMQPushConsumer* pConsumer =
+ static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+ vector<MQMessageQueue> mqs;
+ {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+ for (; it != m_requestQueueTable.end(); ++it) {
+ if (it->second) { // even if it was dropped, also need update offset when
+ // rcv resetOffset cmd
+ mqs.push_back(it->first);
+ }
+ }
+ }
+ vector<MQMessageQueue>::iterator it2 = mqs.begin();
+ for (; it2 != mqs.end(); ++it2) {
+ pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
+ }
+}
+
+SubscriptionData* Rebalance::getSubscriptionData(const string& topic) {
+ if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
+ return m_subscriptionData[topic];
+ }
+ return NULL;
+}
+
+map<string, SubscriptionData*>& Rebalance::getSubscriptionInner() {
+ return m_subscriptionData;
+}
+
+void Rebalance::setSubscriptionData(const string& topic,
+ SubscriptionData* pdata) {
+ if (pdata != NULL &&
+ m_subscriptionData.find(topic) == m_subscriptionData.end())
+ m_subscriptionData[topic] = pdata;
+}
+
+void Rebalance::setTopicSubscribeInfo(const string& topic,
+ vector<MQMessageQueue>& mqs) {
+ if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
+ {
+ boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
+ if (m_topicSubscribeInfoTable.find(topic) !=
+ m_topicSubscribeInfoTable.end())
+ m_topicSubscribeInfoTable.erase(topic);
+ m_topicSubscribeInfoTable[topic] = mqs;
+ }
+ // log
+ vector<MQMessageQueue>::iterator it = mqs.begin();
+ for (; it != mqs.end(); ++it) {
+ LOG_DEBUG("topic [%s] has :%s", topic.c_str(), (*it).toString().c_str());
+ }
+ }
+}
+
+bool Rebalance::getTopicSubscribeInfo(const string& topic,
+ vector<MQMessageQueue>& mqs) {
+ boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
+ if (m_topicSubscribeInfoTable.find(topic) !=
+ m_topicSubscribeInfoTable.end()) {
+ mqs = m_topicSubscribeInfoTable[topic];
+ return true;
+ }
+ return false;
+}
+
+void Rebalance::addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest) {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ m_requestQueueTable[mq] = pPullRequest;
+}
+
+PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+ return m_requestQueueTable[mq];
+ }
+ return NULL;
+}
+
+map<MQMessageQueue, PullRequest*> Rebalance::getPullRequestTable() {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ return m_requestQueueTable;
+}
+
+void Rebalance::unlockAll(bool oneway) {
+ map<string, vector<MQMessageQueue>*> brokerMqs;
+ MQ2PULLREQ requestQueueTable = getPullRequestTable();
+ for (MQ2PULLREQ::iterator it = requestQueueTable.begin();
+ it != requestQueueTable.end(); ++it) {
+ if (!(it->second->isDroped())) {
+ if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
+ vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
+ brokerMqs[it->first.getBrokerName()] = mqs;
+ } else {
+ brokerMqs[it->first.getBrokerName()]->push_back(it->first);
+ }
+ }
+ }
+ LOG_INFO("unLockAll %zu broker mqs", brokerMqs.size());
+ for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
+ itb != brokerMqs.end(); ++itb) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID,
+ true));
+ unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
+ new UnlockBatchRequestBody());
+ vector<MQMessageQueue> mqs(*(itb->second));
+ unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+ unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+ unlockBatchRequest->setMqSet(mqs);
+
+ try {
+ m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(
+ pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
+ m_pConsumer->getSessionCredentials());
+ for (unsigned int i = 0; i != mqs.size(); ++i) {
+ PullRequest* pullreq = getPullRequest(mqs[i]);
+ if (pullreq) {
+ LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
+ pullreq->setLocked(true);
+ } else {
+ LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
+ }
+ }
+ } catch (MQException& e) {
+ LOG_ERROR("unlockBatchMQ fails");
+ }
+ deleteAndZero(itb->second);
+ }
+ brokerMqs.clear();
+}
+
+void Rebalance::unlock(MQMessageQueue mq) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+ MASTER_ID, true));
+ unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
+ new UnlockBatchRequestBody());
+ vector<MQMessageQueue> mqs;
+ mqs.push_back(mq);
+ unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+ unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+ unlockBatchRequest->setMqSet(mqs);
+
+ try {
+ m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(
+ pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
+ m_pConsumer->getSessionCredentials());
+ for (unsigned int i = 0; i != mqs.size(); ++i) {
+ PullRequest* pullreq = getPullRequest(mqs[i]);
+ if (pullreq) {
+ LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
+ pullreq->setLocked(true);
+ } else {
+ LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
+ }
+ }
+ } catch (MQException& e) {
+ LOG_ERROR("unlock fails of mq:%s", mq.toString().c_str());
+ }
+}
+
+void Rebalance::lockAll() {
+ map<string, vector<MQMessageQueue>*> brokerMqs;
+ MQ2PULLREQ requestQueueTable = getPullRequestTable();
+ for (MQ2PULLREQ::iterator it = requestQueueTable.begin();
+ it != requestQueueTable.end(); ++it) {
+ if (!(it->second->isDroped())) {
+ string brokerKey = it->first.getBrokerName() + it->first.getTopic();
+ if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
+ vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
+ brokerMqs[brokerKey] = mqs;
+ brokerMqs[brokerKey]->push_back(it->first);
+ } else {
+ brokerMqs[brokerKey]->push_back(it->first);
+ }
+ }
+ }
+ LOG_INFO("LockAll %zu broker mqs", brokerMqs.size());
+ for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
+ itb != brokerMqs.end(); ++itb) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInSubscribe(
+ (*(itb->second))[0].getBrokerName(), MASTER_ID, true));
+ unique_ptr<LockBatchRequestBody> lockBatchRequest(
+ new LockBatchRequestBody());
+ lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+ lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+ lockBatchRequest->setMqSet(*(itb->second));
+ LOG_INFO("try to lock:%zu mqs of broker:%s", itb->second->size(),
+ itb->first.c_str());
+ try {
+ vector<MQMessageQueue> messageQueues;
+ m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+ pFindBrokerResult->brokerAddr, lockBatchRequest.get(), messageQueues,
+ 1000, m_pConsumer->getSessionCredentials());
+ for (unsigned int i = 0; i != messageQueues.size(); ++i) {
+ PullRequest* pullreq = getPullRequest(messageQueues[i]);
+ if (pullreq) {
+ LOG_INFO("lockBatchMQ success of mq:%s",
+ messageQueues[i].toString().c_str());
+ pullreq->setLocked(true);
+ pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+ } else {
+ LOG_ERROR("lockBatchMQ fails of mq:%s",
+ messageQueues[i].toString().c_str());
+ }
+ }
+ messageQueues.clear();
+ } catch (MQException& e) {
+ LOG_ERROR("lockBatchMQ fails");
+ }
+ deleteAndZero(itb->second);
+ }
+ brokerMqs.clear();
+}
+bool Rebalance::lock(MQMessageQueue mq) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+ MASTER_ID, true));
+ unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
+ lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
+ lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
+ vector<MQMessageQueue> in_mqSet;
+ in_mqSet.push_back(mq);
+ lockBatchRequest->setMqSet(in_mqSet);
+ bool lockResult = false;
+
+ try {
+ vector<MQMessageQueue> messageQueues;
+ LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
+ m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+ pFindBrokerResult->brokerAddr, lockBatchRequest.get(), messageQueues,
+ 1000, m_pConsumer->getSessionCredentials());
+ if (messageQueues.size() == 0) {
+ LOG_ERROR("lock mq on broker:%s failed",
+ pFindBrokerResult->brokerAddr.c_str());
+ return false;
+ }
+ for (unsigned int i = 0; i != messageQueues.size(); ++i) {
+ PullRequest* pullreq = getPullRequest(messageQueues[i]);
+ if (pullreq) {
+ LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
+ pullreq->setLocked(true);
+ pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+ lockResult = true;
+ } else {
+ LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
+ }
+ }
+ messageQueues.clear();
+ return lockResult;
+ } catch (MQException& e) {
+ LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
+ return false;
+ }
+}
+
+//<!************************************************************************
+RebalancePull::RebalancePull(MQConsumer* consumer, MQClientFactory* pfactory)
+ : Rebalance(consumer, pfactory) {}
+
+bool RebalancePull::updateRequestTableInRebalance(
+ const string& topic, vector<MQMessageQueue>& mqsSelf) {
+ return false;
+}
+
+int64 RebalancePull::computePullFromWhere(const MQMessageQueue& mq) {
+ return 0;
+}
+
+void RebalancePull::messageQueueChanged(const string& topic,
+ vector<MQMessageQueue>& mqAll,
+ vector<MQMessageQueue>& mqDivided) {}
+
+void RebalancePull::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {}
+
+//<!***************************************************************************
+RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory)
+ : Rebalance(consumer, pfactory) {}
+
+bool RebalancePush::updateRequestTableInRebalance(
+ const string& topic, vector<MQMessageQueue>& mqsSelf) {
+ LOG_DEBUG("updateRequestTableInRebalance Enter");
+ if (mqsSelf.empty()) {
+ LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
+ }
+
+ bool changed = false;
+
+ //<!remove
+ MQ2PULLREQ requestQueueTable(getPullRequestTable());
+ MQ2PULLREQ::iterator it = requestQueueTable.begin();
+ for (; it != requestQueueTable.end(); ++it) {
+ MQMessageQueue mqtemp = it->first;
+ if (mqtemp.getTopic().compare(topic) == 0) {
+ if (mqsSelf.empty() ||
+ (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
+ if (!(it->second->isDroped())) {
+ it->second->setDroped(true);
+ removeUnnecessaryMessageQueue(mqtemp);
+ it->second->clearAllMsgs(); // add clear operation to avoid bad state
+ // when dropped pullRequest returns
+ // normal
+ LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+ }
+ changed = true;
+ }
+ }
+ }
+
+ //<!add
+ vector<PullRequest*> pullrequestAdd;
+ DefaultMQPushConsumer* pConsumer =
+ static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ vector<MQMessageQueue>::iterator it2 = mqsSelf.begin();
+ for (; it2 != mqsSelf.end(); ++it2) {
+ PullRequest* pPullRequest(getPullRequest(*it2));
+ if (pPullRequest && pPullRequest->isDroped()) {
+ LOG_DEBUG(
+ "before resume the pull handle of this pullRequest, its mq is:%s, "
+ "its offset is:%lld",
+ (it2->toString()).c_str(), pPullRequest->getNextOffset());
+ pConsumer->getOffsetStore()->removeOffset(
+ *it2); // remove dirty offset which maybe update to
+ // OffsetStore::m_offsetTable by consuming After last
+ // drop
+ int64 nextOffset = computePullFromWhere(*it2);
+ if (nextOffset >= 0) {
+ pPullRequest->setDroped(false);
+ pPullRequest->clearAllMsgs(); // avoid consume accumulation and consume
+ // dumplication issues
+ pPullRequest->setNextOffset(nextOffset);
+ pPullRequest->updateQueueMaxOffset(nextOffset);
+ LOG_INFO(
+ "after resume the pull handle of this pullRequest, its mq is:%s, "
+ "its offset is:%lld",
+ (it2->toString()).c_str(), pPullRequest->getNextOffset());
+ changed = true;
+ pConsumer->producePullMsgTask(pPullRequest);
+ } else {
+ LOG_ERROR(
+ "get fatel error QueryOffset of mq:%s, do not reconsume this queue",
+ (it2->toString()).c_str());
+ }
+ }
+
+ if (!pPullRequest) {
+ LOG_INFO("updateRequestTableInRebalance Doesn't find old mq");
+ PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName());
+ pullRequest->m_messageQueue = *it2;
+
+ int64 nextOffset = computePullFromWhere(*it2);
+ if (nextOffset >= 0) {
+ pullRequest->setNextOffset(nextOffset);
+ pullRequest->clearAllMsgs(); // avoid consume accumulation and consume
+ // dumplication issues
+ changed = true;
+ //<! mq-> pq;
+ addPullRequest(*it2, pullRequest);
+ pullrequestAdd.push_back(pullRequest);
+ LOG_INFO("add mq:%s, request initiall offset:%lld",
+ (*it2).toString().c_str(), nextOffset);
+ }
+ }
+ }
+
+ vector<PullRequest*>::iterator it3 = pullrequestAdd.begin();
+ for (; it3 != pullrequestAdd.end(); ++it3) {
+ LOG_DEBUG("start pull request");
+ pConsumer->producePullMsgTask(*it3);
+ }
+
+ LOG_DEBUG("updateRequestTableInRebalance exit");
+ return changed;
+}
+
+int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
+ int64 result = -1;
+ DefaultMQPushConsumer* pConsumer =
+ static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
+ OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+ switch (consumeFromWhere) {
+ case CONSUME_FROM_LAST_OFFSET: {
+ int64 lastOffset = pOffsetStore->readOffset(
+ mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+ if (lastOffset >= 0) {
+ LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld",
+ mq.toString().c_str(), lastOffset);
+ result = lastOffset;
+ }
+ else if (-1 == lastOffset) {
+ LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1",
+ mq.toString().c_str());
+ if (UtilAll::startsWith_retry(mq.getTopic())) {
+ LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0",
+ mq.toString().c_str());
+ result = 0;
+ }
+ else {
+ try {
+ result = pConsumer->maxOffset(mq);
+ LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld",
+ mq.toString().c_str(), result);
+ } catch (MQException& e) {
+ LOG_ERROR(
+ "CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1",
+ mq.toString().c_str());
+ result = -1;
+ }
+ }
+ }
+ else {
+ LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1",
+ mq.toString().c_str());
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_FIRST_OFFSET: {
+ int64 lastOffset = pOffsetStore->readOffset(
+ mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+ if (lastOffset >= 0) {
+ LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld",
+ mq.toString().c_str(), lastOffset);
+ result = lastOffset;
+ } else if (-1 == lastOffset)
+ {
+ LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0",
+ mq.toString().c_str());
+ result = 0;
+ }
+ else {
+ LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1",
+ mq.toString().c_str());
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_TIMESTAMP: {
+ int64 lastOffset = pOffsetStore->readOffset(
+ mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
+ if (lastOffset >= 0) {
+ LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld",
+ mq.toString().c_str(), lastOffset);
+ result = lastOffset;
+ }
+ else if (-1 == lastOffset) {
+ if (UtilAll::startsWith_retry(mq.getTopic())) {
+ try {
+ result = pConsumer->maxOffset(mq);
+ LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset of mq:%s is:%lld",
+ mq.toString().c_str(), result);
+ } catch (MQException& e) {
+ LOG_ERROR(
+ "CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s is -1",
+ mq.toString().c_str());
+ result = -1;
+ }
+ }
+ else {
+ try {
+ } catch (MQException& e) {
+ LOG_ERROR(
+ "CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return 0",
+ mq.toString().c_str());
+ result = -1;
+ }
+ }
+ }
+ else {
+ LOG_ERROR(
+ "CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return -1",
+ mq.toString().c_str());
+ result = -1;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return result;
+}
+
+void RebalancePush::messageQueueChanged(const string& topic,
+ vector<MQMessageQueue>& mqAll,
+ vector<MQMessageQueue>& mqDivided) {}
+
+void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
+ DefaultMQPushConsumer* pConsumer =
+ static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
+
+ pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
+ pOffsetStore->removeOffset(mq);
+ if (pConsumer->getMessageListenerType() == messageListenerOrderly) {
+ unlock(mq);
+ }
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/Rebalance.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/Rebalance.h b/rocketmq-cpp/src/consumer/Rebalance.h
new file mode 100755
index 0000000..42f8667
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/Rebalance.h
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REBALANCEIMPL_H__
+#define __REBALANCEIMPL_H__
+
+#include "AllocateMQStrategy.h"
+#include "ConsumeType.h"
+#include "MQConsumer.h"
+#include "MQMessageQueue.h"
+#include "PullRequest.h"
+#include "SubscriptionData.h"
+
+#include <boost/thread/mutex.hpp>
+
+namespace rocketmq {
+class MQClientFactory;
+//<!************************************************************************
+class Rebalance {
+ public:
+ Rebalance(MQConsumer*, MQClientFactory*);
+ virtual ~Rebalance();
+
+ virtual void messageQueueChanged(const string& topic,
+ vector<MQMessageQueue>& mqAll,
+ vector<MQMessageQueue>& mqDivided) = 0;
+
+ virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq) = 0;
+
+ virtual int64 computePullFromWhere(const MQMessageQueue& mq) = 0;
+
+ virtual bool updateRequestTableInRebalance(
+ const string& topic, vector<MQMessageQueue>& mqsSelf) = 0;
+
+ public:
+ void doRebalance();
+ void persistConsumerOffset();
+ void persistConsumerOffsetByResetOffset();
+ //<!m_subscriptionInner;
+ SubscriptionData* getSubscriptionData(const string& topic);
+ void setSubscriptionData(const string& topic, SubscriptionData* pdata);
+
+ map<string, SubscriptionData*>& getSubscriptionInner();
+
+ //<!m_topicSubscribeInfoTable;
+ void setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+ bool getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+
+ void addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest);
+ PullRequest* getPullRequest(MQMessageQueue mq);
+ map<MQMessageQueue, PullRequest*> getPullRequestTable();
+ void lockAll();
+ bool lock(MQMessageQueue mq);
+ void unlockAll(bool oneway = false);
+ void unlock(MQMessageQueue mq);
+
+ protected:
+ map<string, SubscriptionData*> m_subscriptionData;
+
+ boost::mutex m_topicSubscribeInfoTableMutex;
+ map<string, vector<MQMessageQueue>> m_topicSubscribeInfoTable;
+ typedef map<MQMessageQueue, PullRequest*> MQ2PULLREQ;
+ MQ2PULLREQ m_requestQueueTable;
+ boost::mutex m_requestTableMutex;
+
+ AllocateMQStrategy* m_pAllocateMQStrategy;
+ MQConsumer* m_pConsumer;
+ MQClientFactory* m_pClientFactory;
+};
+
+//<!************************************************************************
+class RebalancePull : public Rebalance {
+ public:
+ RebalancePull(MQConsumer*, MQClientFactory*);
+ virtual ~RebalancePull(){};
+
+ virtual void messageQueueChanged(const string& topic,
+ vector<MQMessageQueue>& mqAll,
+ vector<MQMessageQueue>& mqDivided);
+
+ virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq);
+
+ virtual int64 computePullFromWhere(const MQMessageQueue& mq);
+
+ virtual bool updateRequestTableInRebalance(const string& topic,
+ vector<MQMessageQueue>& mqsSelf);
+};
+
+//<!***************************************************************************
+class RebalancePush : public Rebalance {
+ public:
+ RebalancePush(MQConsumer*, MQClientFactory*);
+ virtual ~RebalancePush(){};
+
+ virtual void messageQueueChanged(const string& topic,
+ vector<MQMessageQueue>& mqAll,
+ vector<MQMessageQueue>& mqDivided);
+
+ virtual void removeUnnecessaryMessageQueue(const MQMessageQueue& mq);
+
+ virtual int64 computePullFromWhere(const MQMessageQueue& mq);
+
+ virtual bool updateRequestTableInRebalance(const string& topic,
+ vector<MQMessageQueue>& mqsSelf);
+};
+
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/SubscriptionData.cpp b/rocketmq-cpp/src/consumer/SubscriptionData.cpp
new file mode 100755
index 0000000..9b20642
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/SubscriptionData.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SubscriptionData.h"
+#include <algorithm>
+#include <sstream>
+#include <vector>
+#include "UtilAll.h"
+#include "Logging.h"
+namespace rocketmq {
+//<!************************************************************************
+SubscriptionData::SubscriptionData() {
+ m_subVersion = UtilAll::currentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const string& topic, const string& subString)
+ : m_topic(topic), m_subString(subString) {
+ m_subVersion = UtilAll::currentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const SubscriptionData& other) {
+ m_subString = other.m_subString;
+ m_subVersion = other.m_subVersion;
+ m_tagSet = other.m_tagSet;
+ m_topic = other.m_topic;
+ m_codeSet = other.m_codeSet;
+}
+
+const string& SubscriptionData::getTopic() const { return m_topic; }
+
+const string& SubscriptionData::getSubString() const { return m_subString; }
+
+void SubscriptionData::setSubString(const string& sub) { m_subString = sub; }
+
+int64 SubscriptionData::getSubVersion() const { return m_subVersion; }
+
+void SubscriptionData::putTagsSet(const string& tag) {
+ m_tagSet.push_back(tag);
+}
+
+bool SubscriptionData::containTag(const string& tag) {
+ return std::find(m_tagSet.begin(), m_tagSet.end(), tag) != m_tagSet.end();
+}
+
+vector<string>& SubscriptionData::getTagsSet() { return m_tagSet; }
+
+bool SubscriptionData::operator==(const SubscriptionData& other) const {
+ if (!m_subString.compare(other.m_subString)) {
+ return false;
+ }
+ if (m_subVersion != other.m_subVersion) {
+ return false;
+ }
+ if (m_tagSet.size() != other.m_tagSet.size()) {
+ return false;
+ }
+ if (!m_topic.compare(other.m_topic)) {
+ return false;
+ }
+ return true;
+}
+
+bool SubscriptionData::operator<(const SubscriptionData& other) const {
+ int ret = m_topic.compare(other.m_topic);
+ if (ret < 0) {
+ return true;
+ } else if (ret == 0) {
+ ret = m_subString.compare(other.m_subString);
+ if (ret < 0) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+}
+
+void SubscriptionData::putCodeSet(const string& tag) {
+ int value = atoi(tag.c_str());
+ m_codeSet.push_back(value);
+}
+
+Json::Value SubscriptionData::toJson() const {
+ Json::Value outJson;
+ outJson["subString"] = m_subString;
+ outJson["subVersion"] = UtilAll::to_string(m_subVersion);
+ outJson["topic"] = m_topic;
+
+ {
+ vector<string>::const_iterator it = m_tagSet.begin();
+ for (; it != m_tagSet.end(); it++) {
+ outJson["tagsSet"].append(*it);
+ }
+ }
+
+ {
+ vector<int>::const_iterator it = m_codeSet.begin();
+ for (; it != m_codeSet.end(); it++) {
+ outJson["codeSet"].append(*it);
+ }
+ }
+ return outJson;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/consumer/SubscriptionData.h b/rocketmq-cpp/src/consumer/SubscriptionData.h
new file mode 100755
index 0000000..89be74f
--- /dev/null
+++ b/rocketmq-cpp/src/consumer/SubscriptionData.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SUBSCRIPTIONDATA_H__
+#define __SUBSCRIPTIONDATA_H__
+
+#include <string>
+#include "UtilAll.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!************************************************************************
+class SubscriptionData {
+ public:
+ SubscriptionData();
+ virtual ~SubscriptionData() {
+ m_tagSet.clear();
+ m_codeSet.clear();
+ }
+ SubscriptionData(const string& topic, const string& subString);
+ SubscriptionData(const SubscriptionData& other);
+
+ const string& getTopic() const;
+ const string& getSubString() const;
+ void setSubString(const string& sub);
+ int64 getSubVersion() const;
+
+ void putTagsSet(const string& tag);
+ bool containTag(const string& tag);
+ vector<string>& getTagsSet();
+
+ void putCodeSet(const string& tag);
+
+ bool operator==(const SubscriptionData& other) const;
+ bool operator<(const SubscriptionData& other) const;
+
+ Json::Value toJson() const;
+
+ private:
+ string m_topic;
+ string m_subString;
+ int64 m_subVersion;
+ vector<string> m_tagSet;
+ vector<int> m_codeSet;
+};
+//<!***************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/dllmain.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/dllmain.cpp b/rocketmq-cpp/src/dllmain.cpp
new file mode 100755
index 0000000..72f61fd
--- /dev/null
+++ b/rocketmq-cpp/src/dllmain.cpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include "windows.h"
+
+BOOL APIENTRY DllMain(HMODULE hModule, DWORD ul_reason_for_call,
+ LPVOID lpReserved) {
+ switch (ul_reason_for_call) {
+ case DLL_PROCESS_ATTACH:
+ break;
+ case DLL_THREAD_ATTACH:
+ break;
+ case DLL_THREAD_DETACH:
+ break;
+ case DLL_PROCESS_DETACH:
+ break;
+ }
+ return TRUE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/log/Logging.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/log/Logging.cpp b/rocketmq-cpp/src/log/Logging.cpp
new file mode 100644
index 0000000..9ac812f
--- /dev/null
+++ b/rocketmq-cpp/src/log/Logging.cpp
@@ -0,0 +1,96 @@
+#include "Logging.h"
+#include <boost/date_time/gregorian/gregorian.hpp>
+#include "UtilAll.h"
+#define BOOST_DATE_TIME_SOURCE
+
+namespace rocketmq {
+
+logAdapter::~logAdapter() { logging::core::get()->remove_all_sinks(); }
+
+logAdapter& logAdapter::getLogInstance() {
+ static logAdapter alogInstance;
+ return alogInstance;
+}
+
+logAdapter::logAdapter() : m_logLevel(eLOG_LEVEL_INFO) {
+ string homeDir(UtilAll::getHomeDirectory());
+ homeDir.append("/logs/metaq-client4cpp/");
+ m_logFile += homeDir;
+ std::string fileName =
+ UtilAll::to_string(getpid()) + "_" + "rocketmq-cpp.log.%N";
+ m_logFile += fileName;
+
+ // boost::log::expressions::attr<
+ // boost::log::attributes::current_thread_id::value_type>("ThreadID");
+ boost::log::register_simple_formatter_factory<
+ boost::log::trivial::severity_level, char>("Severity");
+ m_logSink = logging::add_file_log(
+ keywords::file_name = m_logFile,
+ keywords::rotation_size = 10 * 1024 * 1024,
+ keywords::time_based_rotation =
+ sinks::file::rotation_at_time_point(0, 0, 0),
+ keywords::format = "[%TimeStamp%](%Severity%):%Message%",
+ keywords::min_free_space = 300 * 1024 * 1024, keywords::target = homeDir,
+ keywords::max_size = 20 * 1024 * 1024, // max keep 3 log file defaultly
+ keywords::auto_flush = true);
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::info);
+
+ logging::add_common_attributes();
+}
+
+void logAdapter::setLogLevel(elogLevel logLevel) {
+ m_logLevel = logLevel;
+ switch (logLevel) {
+ case eLOG_LEVEL_DISABLE:
+ logging::core::get()->set_filter(logging::trivial::severity >
+ logging::trivial::fatal);
+
+ break;
+ case eLOG_LEVEL_FATAL:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::fatal);
+ break;
+ case eLOG_LEVEL_ERROR:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::error);
+
+ break;
+ case eLOG_LEVEL_WARN:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::warning);
+
+ break;
+ case eLOG_LEVEL_INFO:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::info);
+
+ break;
+ case eLOG_LEVEL_DEBUG:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::debug);
+
+ break;
+ case eLOG_LEVEL_TRACE:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::trace);
+
+ break;
+ default:
+ logging::core::get()->set_filter(logging::trivial::severity >=
+ logging::trivial::info);
+
+ break;
+ }
+}
+
+elogLevel logAdapter::getLogLevel() { return m_logLevel; }
+
+void logAdapter::setLogFileNumAndSize(int logNum, int sizeOfPerFile) {
+ string homeDir(UtilAll::getHomeDirectory());
+ homeDir.append("/logs/metaq-client4cpp/");
+ m_logSink->locked_backend()->set_file_collector(sinks::file::make_collector(
+ keywords::target = homeDir,
+ keywords::max_size = logNum * sizeOfPerFile * 1024 * 1024));
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/log/Logging.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/log/Logging.h b/rocketmq-cpp/src/log/Logging.h
new file mode 100644
index 0000000..98e9659
--- /dev/null
+++ b/rocketmq-cpp/src/log/Logging.h
@@ -0,0 +1,75 @@
+#ifndef _ALOG_ADAPTER_H_
+#define _ALOG_ADAPTER_H_
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/log/core.hpp>
+#include <boost/log/expressions.hpp>
+#include <boost/log/sinks/text_file_backend.hpp>
+#include <boost/log/sources/record_ostream.hpp>
+#include <boost/log/sources/severity_logger.hpp>
+#include <boost/log/trivial.hpp>
+#include <boost/log/utility/manipulators/add_value.hpp>
+#include <boost/log/utility/setup/common_attributes.hpp>
+#include <boost/log/utility/setup/file.hpp>
+#include <boost/scoped_array.hpp>
+#include <boost/shared_ptr.hpp>
+#include "MQClient.h"
+
+namespace logging = boost::log;
+namespace src = boost::log::sources;
+namespace sinks = boost::log::sinks;
+namespace expr = boost::log::expressions;
+namespace keywords = boost::log::keywords;
+using namespace boost::log::trivial;
+namespace rocketmq {
+
+class logAdapter {
+ public:
+ ~logAdapter();
+ static logAdapter& getLogInstance();
+ void setLogLevel(elogLevel logLevel);
+ elogLevel getLogLevel();
+ void setLogFileNumAndSize(int logNum, int sizeOfPerFile);
+ src::severity_logger<boost::log::trivial::severity_level>&
+ getSeverityLogger() {
+ return m_severityLogger;
+ }
+
+ private:
+ logAdapter();
+ elogLevel m_logLevel;
+ std::string m_logFile;
+ src::severity_logger<boost::log::trivial::severity_level> m_severityLogger;
+ typedef sinks::synchronous_sink<sinks::text_file_backend> logSink_t;
+ boost::shared_ptr<logSink_t> m_logSink;
+};
+
+#define ALOG_ADAPTER logAdapter::getLogInstance()
+
+#define AGENT_LOGGER ALOG_ADAPTER.getSeverityLogger()
+
+class LogUtil {
+ public:
+ static void VLogError(boost::log::trivial::severity_level level,
+ const char* format, ...) {
+ va_list arg_ptr;
+ va_start(arg_ptr, format);
+ boost::scoped_array<char> formattedString(new char[1024]);
+ vsnprintf(formattedString.get(), 1024, format, arg_ptr);
+ BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get();
+ va_end(arg_ptr);
+ }
+};
+
+#define LOG_FATAL(format, args...) \
+ LogUtil::VLogError(boost::log::trivial::fatal, format, ##args)
+#define LOG_ERROR(format, args...) \
+ LogUtil::VLogError(boost::log::trivial::error, format, ##args)
+#define LOG_WARN(format, args...) \
+ LogUtil::VLogError(boost::log::trivial::warning, format, ##args)
+#define LOG_INFO(format, args...) \
+ LogUtil::VLogError(boost::log::trivial::info, format, ##args)
+#define LOG_DEBUG(format, args...) \
+ LogUtil::VLogError(boost::log::trivial::debug, format, ##args)
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQDecoder.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQDecoder.cpp b/rocketmq-cpp/src/message/MQDecoder.cpp
new file mode 100755
index 0000000..4dde1f5
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQDecoder.cpp
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQDecoder.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sstream>
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!***************************************************************************
+const int MQDecoder::MSG_ID_LENGTH = 8 + 8;
+
+const char MQDecoder::NAME_VALUE_SEPARATOR = 1;
+const char MQDecoder::PROPERTY_SEPARATOR = 2;
+
+int MQDecoder::MessageMagicCodePostion = 4;
+int MQDecoder::MessageFlagPostion = 16;
+int MQDecoder::MessagePhysicOffsetPostion = 28;
+int MQDecoder::MessageStoreTimestampPostion = 56;
+//<!***************************************************************************
+string MQDecoder::createMessageId(sockaddr addr, int64 offset) {
+ int host, port;
+ socketAddress2IPPort(addr, host, port);
+
+ MemoryOutputStream outputmen(MSG_ID_LENGTH);
+ outputmen.writeIntBigEndian(host);
+ outputmen.writeIntBigEndian(port);
+ outputmen.writeInt64BigEndian(offset);
+
+ const char* bytes = static_cast<const char*>(outputmen.getData());
+ int len = outputmen.getDataSize();
+
+ return UtilAll::bytes2string(bytes, len);
+}
+
+MQMessageId MQDecoder::decodeMessageId(const string& msgId) {
+
+ string ipstr = msgId.substr(0, 8);
+ string portstr = msgId.substr(8, 8);
+ string offsetstr = msgId.substr(16);
+
+ char* end;
+ int ipint = strtoul(ipstr.c_str(), &end, 16);
+ int portint = strtoul(portstr.c_str(), &end, 16);
+
+ int64 offset = UtilAll::hexstr2ull(offsetstr.c_str());
+
+ offset = n2hll(offset);
+
+ portint = ntohl(portint);
+ short port = portint;
+
+ struct sockaddr_in sa;
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ sa.sin_addr.s_addr = ipint;
+
+ sockaddr addr;
+ memcpy(&addr, &sa, sizeof(sockaddr));
+
+ MQMessageId id(addr, offset);
+
+ return id;
+}
+
+MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) {
+ return decode(byteBuffer, true);
+}
+
+MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) {
+ MQMessageExt* msgExt = new MQMessageExt();
+
+ // 1 TOTALSIZE
+ int storeSize = byteBuffer.readIntBigEndian();
+ msgExt->setStoreSize(storeSize);
+
+ // 2 MAGICCODE sizeof(int)
+ byteBuffer.skipNextBytes(sizeof(int));
+
+ // 3 BODYCRC
+ int bodyCRC = byteBuffer.readIntBigEndian();
+ msgExt->setBodyCRC(bodyCRC);
+
+ // 4 QUEUEID
+ int queueId = byteBuffer.readIntBigEndian();
+ msgExt->setQueueId(queueId);
+
+ // 5 FLAG
+ int flag = byteBuffer.readIntBigEndian();
+ msgExt->setFlag(flag);
+
+ // 6 QUEUEOFFSET
+ int64 queueOffset = byteBuffer.readInt64BigEndian();
+ msgExt->setQueueOffset(queueOffset);
+
+ // 7 PHYSICALOFFSET
+ int64 physicOffset = byteBuffer.readInt64BigEndian();
+ msgExt->setCommitLogOffset(physicOffset);
+
+ // 8 SYSFLAG
+ int sysFlag = byteBuffer.readIntBigEndian();
+ msgExt->setSysFlag(sysFlag);
+
+ // 9 BORNTIMESTAMP
+ int64 bornTimeStamp = byteBuffer.readInt64BigEndian();
+ msgExt->setBornTimestamp(bornTimeStamp);
+
+ // 10 BORNHOST
+ int bornHost = byteBuffer.readIntBigEndian();
+ int port = byteBuffer.readIntBigEndian();
+ sockaddr bornAddr = IPPort2socketAddress(bornHost, port);
+ msgExt->setBornHost(bornAddr);
+
+ // 11 STORETIMESTAMP
+ int64 storeTimestamp = byteBuffer.readInt64BigEndian();
+ msgExt->setStoreTimestamp(storeTimestamp);
+
+ // // 12 STOREHOST
+ int storeHost = byteBuffer.readIntBigEndian();
+ port = byteBuffer.readIntBigEndian();
+ sockaddr storeAddr = IPPort2socketAddress(storeHost, port);
+ msgExt->setStoreHost(storeAddr);
+
+ // 13 RECONSUMETIMES
+ int reconsumeTimes = byteBuffer.readIntBigEndian();
+ msgExt->setReconsumeTimes(reconsumeTimes);
+
+ // 14 Prepared Transaction Offset
+ int64 preparedTransactionOffset = byteBuffer.readInt64BigEndian();
+ msgExt->setPreparedTransactionOffset(preparedTransactionOffset);
+
+ // 15 BODY
+ int bodyLen = byteBuffer.readIntBigEndian();
+ if (bodyLen > 0) {
+ if (readBody) {
+ MemoryBlock block;
+ byteBuffer.readIntoMemoryBlock(block, bodyLen);
+
+ const char* const pBody = static_cast<const char*>(block.getData());
+ int len = block.getSize();
+ string msgbody(pBody, len);
+
+ // decompress body
+ if ((sysFlag & MessageSysFlag::CompressedFlag) ==
+ MessageSysFlag::CompressedFlag) {
+ string outbody;
+ if (UtilAll::inflate(msgbody, outbody)) {
+ msgExt->setBody(outbody);
+ }
+ } else {
+ msgExt->setBody(msgbody);
+ }
+ } else {
+ byteBuffer.skipNextBytes(bodyLen);
+ }
+ }
+
+ // 16 TOPIC
+ int topicLen = (int)byteBuffer.readByte();
+ MemoryBlock block;
+ byteBuffer.readIntoMemoryBlock(block, topicLen);
+ const char* const pTopic = static_cast<const char*>(block.getData());
+ topicLen = block.getSize();
+ msgExt->setTopic(pTopic, topicLen);
+
+ // 17 properties
+ short propertiesLen = byteBuffer.readShortBigEndian();
+ if (propertiesLen > 0) {
+ MemoryBlock block;
+ byteBuffer.readIntoMemoryBlock(block, propertiesLen);
+ const char* const pProperty = static_cast<const char*>(block.getData());
+ int len = block.getSize();
+ string propertiesString(pProperty, len);
+
+ map<string, string> propertiesMap;
+ string2messageProperties(propertiesString, propertiesMap);
+ msgExt->setProperties(propertiesMap);
+ propertiesMap.clear();
+ }
+
+ // 18 msg ID
+ string msgId = createMessageId(msgExt->getStoreHost(),
+ (int64)msgExt->getCommitLogOffset());
+ msgExt->setMsgId(msgId);
+
+ // LOG_INFO("get msgExt from remote server, its contents
+ // are:%s",msgExt->toString().c_str());
+ return msgExt;
+}
+
+void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec) {
+ mqvec.clear();
+ decodes(mem, mqvec, true);
+}
+
+void MQDecoder::decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec,
+ bool readBody) {
+ MemoryInputStream rawInput(*mem, true);
+
+ while (rawInput.getNumBytesRemaining() > 0) {
+ unique_ptr<MQMessageExt> msg(decode(rawInput, readBody));
+ mqvec.push_back(*msg);
+ }
+}
+
+string MQDecoder::messageProperties2String(
+ const map<string, string>& properties) {
+ string os;
+ map<string, string>::const_iterator it = properties.begin();
+
+ for (; it != properties.end(); ++it) {
+ // os << it->first << NAME_VALUE_SEPARATOR << it->second <<
+ // PROPERTY_SEPARATOR;
+ os.append(it->first);
+ os += NAME_VALUE_SEPARATOR;
+ os.append(it->second);
+ os += PROPERTY_SEPARATOR;
+ }
+
+ return os;
+}
+
+void MQDecoder::string2messageProperties(const string& propertiesString,
+ map<string, string>& properties) {
+ vector<string> out;
+ UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR);
+
+ for (size_t i = 0; i < out.size(); i++) {
+ vector<string> outValue;
+ UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR);
+
+ if (outValue.size() == 2) {
+ properties[outValue[0]] = outValue[1];
+ }
+ }
+}
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQDecoder.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQDecoder.h b/rocketmq-cpp/src/message/MQDecoder.h
new file mode 100755
index 0000000..393e4c7
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQDecoder.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEDECODER_H__
+#define __MESSAGEDECODER_H__
+
+#include "MQClientException.h"
+#include "MQMessageExt.h"
+#include "MQMessageId.h"
+#include "MemoryInputStream.h"
+#include "SocketUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQDecoder {
+ public:
+ static string createMessageId(sockaddr addr, int64 offset);
+ static MQMessageId decodeMessageId(const string& msgId);
+
+ static void decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec);
+
+ static void decodes(const MemoryBlock* mem, vector<MQMessageExt>& mqvec,
+ bool readBody);
+
+ static string messageProperties2String(const map<string, string>& properties);
+ static void string2messageProperties(const string& propertiesString,
+ map<string, string>& properties);
+
+ private:
+ static MQMessageExt* decode(MemoryInputStream& byteBuffer);
+ static MQMessageExt* decode(MemoryInputStream& byteBuffer, bool readBody);
+
+ public:
+ static const char NAME_VALUE_SEPARATOR;
+ static const char PROPERTY_SEPARATOR;
+ static const int MSG_ID_LENGTH;
+ static int MessageMagicCodePostion;
+ static int MessageFlagPostion;
+ static int MessagePhysicOffsetPostion;
+ static int MessageStoreTimestampPostion;
+};
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessage.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessage.cpp b/rocketmq-cpp/src/message/MQMessage.cpp
new file mode 100755
index 0000000..db5487f
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessage.cpp
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessage.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+const string MQMessage::PROPERTY_KEYS = "KEYS";
+const string MQMessage::PROPERTY_TAGS = "TAGS";
+const string MQMessage::PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
+const string MQMessage::PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+const string MQMessage::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+const string MQMessage::PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+const string MQMessage::PROPERTY_REAL_QUEUE_ID = "REAL_QID";
+const string MQMessage::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
+const string MQMessage::PROPERTY_PRODUCER_GROUP = "PGROUP";
+const string MQMessage::PROPERTY_MIN_OFFSET = "MIN_OFFSET";
+const string MQMessage::PROPERTY_MAX_OFFSET = "MAX_OFFSET";
+const string MQMessage::KEY_SEPARATOR = " ";
+//<!************************************************************************
+MQMessage::MQMessage() { Init("", "", "", 0, "", true); }
+
+MQMessage::MQMessage(const string& topic, const string& body) {
+ Init(topic, "", "", 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+ const string& body) {
+ Init(topic, tags, "", 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+ const string& keys, const string& body) {
+ Init(topic, tags, keys, 0, body, true);
+}
+
+MQMessage::MQMessage(const string& topic, const string& tags,
+ const string& keys, const int flag, const string& body,
+ bool waitStoreMsgOK) {
+ Init(topic, tags, keys, flag, body, waitStoreMsgOK);
+}
+
+MQMessage::~MQMessage() { m_properties.clear(); }
+
+MQMessage::MQMessage(const MQMessage& other) {
+ m_body = other.m_body;
+ m_topic = other.m_topic;
+ m_flag = other.m_flag;
+ m_properties = other.m_properties;
+}
+
+MQMessage& MQMessage::operator=(const MQMessage& other) {
+ if (this != &other) {
+ m_body = other.m_body;
+ m_topic = other.m_topic;
+ m_flag = other.m_flag;
+ m_properties = other.m_properties;
+ }
+ return *this;
+}
+
+void MQMessage::setProperty(const string& name, const string& value) {
+ m_properties[name] = value;
+}
+
+string MQMessage::getProperty(const string& name) const {
+ map<string, string>::const_iterator it = m_properties.find(name);
+
+ return (it == m_properties.end()) ? "" : (*it).second;
+}
+
+string MQMessage::getTopic() const { return m_topic; }
+
+void MQMessage::setTopic(const string& topic) { m_topic = topic; }
+
+void MQMessage::setTopic(const char* body, int len) {
+ m_topic.clear();
+ m_topic.append(body, len);
+}
+
+string MQMessage::getTags() const { return getProperty(PROPERTY_TAGS); }
+
+void MQMessage::setTags(const string& tags) {
+ setProperty(PROPERTY_TAGS, tags);
+}
+
+string MQMessage::getKeys() const { return getProperty(PROPERTY_KEYS); }
+
+void MQMessage::setKeys(const string& keys) {
+ setProperty(PROPERTY_KEYS, keys);
+}
+
+void MQMessage::setKeys(const vector<string>& keys) {
+ if (keys.empty()) {
+ return;
+ }
+
+ vector<string>::const_iterator it = keys.begin();
+ string str;
+ str += *it;
+ it++;
+
+ for (; it != keys.end(); it++) {
+ str += KEY_SEPARATOR;
+ str += *it;
+ }
+
+ setKeys(str);
+}
+
+int MQMessage::getDelayTimeLevel() const {
+ string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL);
+ if (!tmp.empty()) {
+ return atoi(tmp.c_str());
+ }
+ return 0;
+}
+
+void MQMessage::setDelayTimeLevel(int level) {
+ char tmp[16];
+ sprintf(tmp, "%d", level);
+
+ setProperty(PROPERTY_DELAY_TIME_LEVEL, tmp);
+}
+
+bool MQMessage::isWaitStoreMsgOK() {
+ string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
+ if (tmp.empty()) {
+ return true;
+ } else {
+ return (tmp == "true") ? true : false;
+ }
+}
+
+void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) {
+ if (waitStoreMsgOK) {
+ setProperty(PROPERTY_WAIT_STORE_MSG_OK, "true");
+ } else {
+ setProperty(PROPERTY_WAIT_STORE_MSG_OK, "false");
+ }
+}
+
+int MQMessage::getFlag() const { return m_flag; }
+
+void MQMessage::setFlag(int flag) { m_flag = flag; }
+
+string MQMessage::getBody() const { return m_body; }
+
+void MQMessage::setBody(const char* body, int len) {
+ m_body.clear();
+ m_body.append(body, len);
+}
+
+void MQMessage::setBody(const string &body) {
+ m_body.clear();
+ m_body.append(body);
+}
+
+map<string, string> MQMessage::getProperties() const { return m_properties; }
+
+void MQMessage::setProperties(map<string, string>& properties) {
+ m_properties = properties;
+}
+
+void MQMessage::Init(const string& topic, const string& tags,
+ const string& keys, const int flag, const string& body,
+ bool waitStoreMsgOK) {
+ m_topic = topic;
+ m_flag = flag;
+ m_body = body;
+
+ if (tags.length() > 0) {
+ setTags(tags);
+ }
+
+ if (keys.length() > 0) {
+ setKeys(keys);
+ }
+
+ setWaitStoreMsgOK(waitStoreMsgOK);
+}
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageExt.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageExt.cpp b/rocketmq-cpp/src/message/MQMessageExt.cpp
new file mode 100755
index 0000000..bfba42d
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageExt.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessageExt.h"
+#include "MessageSysFlag.h"
+#include "SocketUtil.h"
+#include "TopicFilterType.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQMessageExt::MQMessageExt()
+ : m_queueOffset(0),
+ m_commitLogOffset(0),
+ m_bornTimestamp(0),
+ m_storeTimestamp(0),
+ m_preparedTransactionOffset(0),
+ m_queueId(0),
+ m_storeSize(0),
+ m_sysFlag(0),
+ m_bodyCRC(0),
+ m_reconsumeTimes(3),
+ m_msgId("") {}
+
+MQMessageExt::MQMessageExt(int queueId, int64 bornTimestamp, sockaddr bornHost,
+ int64 storeTimestamp, sockaddr storeHost,
+ string msgId)
+ : m_queueOffset(0),
+ m_commitLogOffset(0),
+ m_bornTimestamp(bornTimestamp),
+ m_storeTimestamp(storeTimestamp),
+ m_preparedTransactionOffset(0),
+ m_queueId(queueId),
+ m_storeSize(0),
+ m_sysFlag(0),
+ m_bodyCRC(0),
+ m_reconsumeTimes(3),
+ m_bornHost(bornHost),
+ m_storeHost(storeHost),
+ m_msgId(msgId) {}
+
+MQMessageExt::~MQMessageExt() {}
+
+int MQMessageExt::getQueueId() const { return m_queueId; }
+
+void MQMessageExt::setQueueId(int queueId) { m_queueId = queueId; }
+
+int64 MQMessageExt::getBornTimestamp() const { return m_bornTimestamp; }
+
+void MQMessageExt::setBornTimestamp(int64 bornTimestamp) {
+ m_bornTimestamp = bornTimestamp;
+}
+
+sockaddr MQMessageExt::getBornHost() const { return m_bornHost; }
+
+string MQMessageExt::getBornHostString() const {
+ return socketAddress2String(m_bornHost);
+}
+
+string MQMessageExt::getBornHostNameString() const {
+ return getHostName(m_bornHost);
+}
+
+void MQMessageExt::setBornHost(const sockaddr& bornHost) {
+ m_bornHost = bornHost;
+}
+
+int64 MQMessageExt::getStoreTimestamp() const { return m_storeTimestamp; }
+
+void MQMessageExt::setStoreTimestamp(int64 storeTimestamp) {
+ m_storeTimestamp = storeTimestamp;
+}
+
+sockaddr MQMessageExt::getStoreHost() const { return m_storeHost; }
+
+string MQMessageExt::getStoreHostString() const {
+ return socketAddress2String(m_storeHost);
+}
+
+void MQMessageExt::setStoreHost(const sockaddr& storeHost) {
+ m_storeHost = storeHost;
+}
+
+const string& MQMessageExt::getMsgId() const { return m_msgId; }
+
+void MQMessageExt::setMsgId(const string& msgId) { m_msgId = msgId; }
+
+int MQMessageExt::getSysFlag() const { return m_sysFlag; }
+
+void MQMessageExt::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; }
+
+int MQMessageExt::getBodyCRC() const { return m_bodyCRC; }
+
+void MQMessageExt::setBodyCRC(int bodyCRC) { m_bodyCRC = bodyCRC; }
+
+int64 MQMessageExt::getQueueOffset() const { return m_queueOffset; }
+
+void MQMessageExt::setQueueOffset(int64 queueOffset) {
+ m_queueOffset = queueOffset;
+}
+
+int64 MQMessageExt::getCommitLogOffset() const { return m_commitLogOffset; }
+
+void MQMessageExt::setCommitLogOffset(int64 physicOffset) {
+ m_commitLogOffset = physicOffset;
+}
+
+int MQMessageExt::getStoreSize() const { return m_storeSize; }
+
+void MQMessageExt::setStoreSize(int storeSize) { m_storeSize = storeSize; }
+
+int MQMessageExt::parseTopicFilterType(int sysFlag) {
+ if ((sysFlag & MessageSysFlag::MultiTagsFlag) ==
+ MessageSysFlag::MultiTagsFlag) {
+ return MULTI_TAG;
+ }
+ return SINGLE_TAG;
+}
+
+int MQMessageExt::getReconsumeTimes() const { return m_reconsumeTimes; }
+
+void MQMessageExt::setReconsumeTimes(int reconsumeTimes) {
+ m_reconsumeTimes = reconsumeTimes;
+}
+
+int64 MQMessageExt::getPreparedTransactionOffset() const {
+ return m_preparedTransactionOffset;
+}
+
+void MQMessageExt::setPreparedTransactionOffset(
+ int64 preparedTransactionOffset) {
+ m_preparedTransactionOffset = preparedTransactionOffset;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageId.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageId.h b/rocketmq-cpp/src/message/MQMessageId.h
new file mode 100755
index 0000000..366ac20
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageId.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEID_H__
+#define __MESSAGEID_H__
+
+#include "SocketUtil.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQMessageId {
+ public:
+ MQMessageId(sockaddr address, int64 offset)
+ : m_address(address), m_offset(offset) {}
+
+ sockaddr getAddress() const { return m_address; }
+
+ void setAddress(sockaddr address) { m_address = address; }
+
+ int64 getOffset() const { return m_offset; }
+
+ void setOffset(int64 offset) { m_offset = offset; }
+
+ private:
+ sockaddr m_address;
+ int64 m_offset;
+};
+
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/message/MQMessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/message/MQMessageQueue.cpp b/rocketmq-cpp/src/message/MQMessageQueue.cpp
new file mode 100755
index 0000000..60481e5
--- /dev/null
+++ b/rocketmq-cpp/src/message/MQMessageQueue.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQMessageQueue::MQMessageQueue() {
+ m_queueId = -1; // invalide mq
+ m_topic.clear();
+ m_brokerName.clear();
+}
+
+MQMessageQueue::MQMessageQueue(const std::string& topic, const std::string& brokerName,
+ int queueId)
+ : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) {}
+
+MQMessageQueue::MQMessageQueue(const MQMessageQueue& other)
+ : m_topic(other.m_topic),
+ m_brokerName(other.m_brokerName),
+ m_queueId(other.m_queueId) {}
+
+MQMessageQueue& MQMessageQueue::operator=(const MQMessageQueue& other) {
+ if (this != &other) {
+ m_brokerName = other.m_brokerName;
+ m_topic = other.m_topic;
+ m_queueId = other.m_queueId;
+ }
+ return *this;
+}
+
+std::string MQMessageQueue::getTopic() const { return m_topic; }
+
+void MQMessageQueue::setTopic(const std::string& topic) { m_topic = topic; }
+
+std::string MQMessageQueue::getBrokerName() const { return m_brokerName; }
+
+void MQMessageQueue::setBrokerName(const std::string& brokerName) {
+ m_brokerName = brokerName;
+}
+
+int MQMessageQueue::getQueueId() const { return m_queueId; }
+
+void MQMessageQueue::setQueueId(int queueId) { m_queueId = queueId; }
+
+bool MQMessageQueue::operator==(const MQMessageQueue& mq) const {
+ if (this == &mq) {
+ return true;
+ }
+
+ if (m_brokerName != mq.m_brokerName) {
+ return false;
+ }
+
+ if (m_queueId != mq.m_queueId) {
+ return false;
+ }
+
+ if (m_topic != mq.m_topic) {
+ return false;
+ }
+
+ return true;
+}
+
+int MQMessageQueue::compareTo(const MQMessageQueue& mq) const {
+ int result = m_topic.compare(mq.m_topic);
+ if (result != 0) {
+ return result;
+ }
+
+ result = m_brokerName.compare(mq.m_brokerName);
+ if (result != 0) {
+ return result;
+ }
+
+ return m_queueId - mq.m_queueId;
+}
+
+bool MQMessageQueue::operator<(const MQMessageQueue& mq) const {
+ return compareTo(mq) < 0;
+}
+
+//<!***************************************************************************
+} //<!end namespace;