You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:44 UTC
[10/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.cpp b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
deleted file mode 100755
index b8650c6..0000000
--- a/rocketmq-client4cpp/src/consumer/PullRequest.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullRequest.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-PullRequest::~PullRequest()
-{
-
-}
-
-std::string PullRequest::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-void PullRequest::setConsumerGroup(const std::string& consumerGroup)
-{
- m_consumerGroup = consumerGroup;
-}
-
-MessageQueue& PullRequest::getMessageQueue()
-{
- return m_messageQueue;
-}
-
-void PullRequest::setMessageQueue(const MessageQueue& messageQueue)
-{
- m_messageQueue = messageQueue;
-}
-
-long long PullRequest::getNextOffset()
-{
- return m_nextOffset;
-}
-
-void PullRequest::setNextOffset(long long nextOffset)
-{
- m_nextOffset = nextOffset;
-}
-
-int PullRequest::hashCode()
-{
- /*
- final int prime = 31;
- int result = 1;
- result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
- result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
- return result;
- */
- std::stringstream ss;
- ss << m_consumerGroup
- << m_messageQueue.hashCode();
- return UtilAll::hashCode(ss.str());
-}
-
-std::string PullRequest::toString() const
-{
- std::stringstream ss;
- ss << "{consumerGroup=" << m_consumerGroup
- << ",messageQueue=" << m_messageQueue.toString()
- << ",nextOffset=" << m_nextOffset << "}";
- return ss.str();
-}
-
-
-bool PullRequest::operator==(const PullRequest& other)
-{
- if (m_consumerGroup != other.m_consumerGroup)
- {
- return false;
- }
-
- if (!(m_messageQueue == other.m_messageQueue))
- {
- return false;
- }
-
- return true;
-}
-
-ProcessQueue* PullRequest::getProcessQueue()
-{
- return m_pProcessQueue;
-}
-
-void PullRequest::setProcessQueue(ProcessQueue* pProcessQueue)
-{
- m_pProcessQueue = pProcessQueue;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.h b/rocketmq-client4cpp/src/consumer/PullRequest.h
deleted file mode 100755
index 3fb8367..0000000
--- a/rocketmq-client4cpp/src/consumer/PullRequest.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLREQUEST_H__
-#define __PULLREQUEST_H__
-
-#include <string>
-#include <sstream>
-
-#include "MessageQueue.h"
-#include "ProcessQueue.h"
-
-namespace rmq
-{
- class PullRequest
- {
- public:
- virtual ~PullRequest();
-
- std::string getConsumerGroup();
- void setConsumerGroup(const std::string& consumerGroup);
-
- MessageQueue& getMessageQueue();
- void setMessageQueue(const MessageQueue& messageQueue);
-
- long long getNextOffset();
- void setNextOffset(long long nextOffset);
-
- int hashCode();
- std::string toString() const;
-
- bool operator==(const PullRequest& other);
-
- ProcessQueue* getProcessQueue();
- void setProcessQueue(ProcessQueue* pProcessQueue);
-
- private:
- std::string m_consumerGroup;
- MessageQueue m_messageQueue;
-
- ProcessQueue* m_pProcessQueue;
- long long m_nextOffset;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullResultExt.h b/rocketmq-client4cpp/src/consumer/PullResultExt.h
deleted file mode 100755
index 24235b2..0000000
--- a/rocketmq-client4cpp/src/consumer/PullResultExt.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLRESULTEXT_H__
-#define __PULLRESULTEXT_H__
-
-#include "PullResult.h"
-
-namespace rmq
-{
-
- struct PullResultExt : public PullResult
- {
- PullResultExt(PullStatus pullStatus,
- long long nextBeginOffset,
- long long minOffset,
- long long maxOffset,
- std::list<MessageExt*>& msgFoundList,
- long suggestWhichBrokerId,
- const char* messageBinary,
- int messageBinaryLen)
- : PullResult(pullStatus,
- nextBeginOffset,
- minOffset,
- maxOffset,
- msgFoundList),
- suggestWhichBrokerId(suggestWhichBrokerId),
- messageBinary(messageBinary),
- messageBinaryLen(messageBinaryLen)
- {
-
- }
-
- long suggestWhichBrokerId;
- const char* messageBinary;
- int messageBinaryLen;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
deleted file mode 100755
index efdc1cc..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
+++ /dev/null
@@ -1,613 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalanceImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MixAll.h"
-#include "LockBatchBody.h"
-#include "MQClientAPIImpl.h"
-#include "KPRUtil.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
- MQClientFactory* pMQClientFactory)
- : m_consumerGroup(consumerGroup),
- m_messageModel(messageModel),
- m_pAllocateMessageQueueStrategy(pAllocateMessageQueueStrategy),
- m_pMQClientFactory(pMQClientFactory)
-{
-
-}
-
-RebalanceImpl::~RebalanceImpl()
-{
-}
-
-void RebalanceImpl::unlock(MessageQueue& mq, bool oneway)
-{
- FindBrokerResult findBrokerResult =
- m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
- if (!findBrokerResult.brokerAddr.empty())
- {
- UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
- requestBody->setConsumerGroup(m_consumerGroup);
- requestBody->setClientId(m_pMQClientFactory->getClientId());
- requestBody->getMqSet().insert(mq);
-
- try
- {
- m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
- requestBody, 1000, oneway);
- }
- catch (...)
- {
- RMQ_ERROR("unlockBatchMQ exception, MQ: {%s}" , mq.toString().c_str());
- }
- }
-}
-
-void RebalanceImpl::unlockAll(bool oneway)
-{
- std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
- std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
-
- for (; it != brokerMqs.end(); it++)
- {
- std::string brokerName = it->first;
- std::set<MessageQueue> mqs = it->second;
-
- if (mqs.empty())
- {
- continue;
- }
-
- FindBrokerResult findBrokerResult =
- m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
-
- if (!findBrokerResult.brokerAddr.empty())
- {
- UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
- requestBody->setConsumerGroup(m_consumerGroup);
- requestBody->setClientId(m_pMQClientFactory->getClientId());
- requestBody->setMqSet(mqs);
-
- try
- {
- m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
- requestBody, 1000, oneway);
-
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::set<MessageQueue>::iterator itm = mqs.begin();
- for (; itm != mqs.end(); itm++)
- {
- std::map<MessageQueue, ProcessQueue*>::iterator itp = m_processQueueTable.find(*itm);
- if (itp != m_processQueueTable.end())
- {
- itp->second->setLocked(false);
- RMQ_INFO("the message queue unlock OK, Group: {%s}, MQ: {%s}",
- m_consumerGroup.c_str(), (*itm).toString().c_str());
- }
- }
- }
- catch (...)
- {
- RMQ_ERROR("unlockBatchMQ exception, mqs.size: {%u} ", (unsigned)mqs.size());
- }
- }
- }
-}
-
-bool RebalanceImpl::lock(MessageQueue& mq)
-{
- FindBrokerResult findBrokerResult =
- m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
- if (!findBrokerResult.brokerAddr.empty())
- {
- LockBatchRequestBody* requestBody = new LockBatchRequestBody();
- requestBody->setConsumerGroup(m_consumerGroup);
- requestBody->setClientId(m_pMQClientFactory->getClientId());
- requestBody->getMqSet().insert(mq);
-
- try
- {
- std::set<MessageQueue> lockedMq =
- m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
- findBrokerResult.brokerAddr, requestBody, 1000);
-
- std::set<MessageQueue>::iterator it = lockedMq.begin();
- for (; it != lockedMq.end(); it++)
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- MessageQueue mmqq = *it;
- std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mmqq);
- if (itt != m_processQueueTable.end())
- {
- itt->second->setLocked(true);
- itt->second->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
- }
- }
-
- it = lockedMq.find(mq);
- bool lockOK = (it != lockedMq.end());
-
- RMQ_INFO("the message queue lock {%s}, {%s}, {%s}",//
- (lockOK ? "OK" : "Failed"), //
- m_consumerGroup.c_str(), //
- mq.toString().c_str());
- return lockOK;
- }
- catch (...)
- {
- RMQ_ERROR("lockBatchMQ exception, MQ: {%s}", mq.toString().c_str());
- }
- }
-
- return false;
-}
-
-void RebalanceImpl::lockAll()
-{
- std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
-
- std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
- for (; it != brokerMqs.end(); it++)
- {
- std::string brokerName = it->first;
- std::set<MessageQueue> mqs = it->second;
-
- if (mqs.empty())
- {
- continue;
- }
-
- FindBrokerResult findBrokerResult =
- m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
- if (!findBrokerResult.brokerAddr.empty())
- {
- LockBatchRequestBody* requestBody = new LockBatchRequestBody();
- requestBody->setConsumerGroup(m_consumerGroup);
- requestBody->setClientId(m_pMQClientFactory->getClientId());
- requestBody->setMqSet(mqs);
-
- try
- {
- std::set<MessageQueue> lockOKMQSet =
- m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
- findBrokerResult.brokerAddr, requestBody, 1000);
-
- std::set<MessageQueue>::iterator its = lockOKMQSet.begin();
- for (; its != lockOKMQSet.end(); its++)
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- MessageQueue mq = *its;
- std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
- if (itt != m_processQueueTable.end())
- {
- ProcessQueue* processQueue = itt->second;
- if (!processQueue->isLocked())
- {
- RMQ_INFO("the message queue locked OK, Group: {%s}, MQ: %s",
- m_consumerGroup.c_str(),
- mq.toString().c_str());
- }
-
- processQueue->setLocked(true);
- processQueue->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
- }
- }
-
- its = mqs.begin();
- for (; its != mqs.end(); its++)
- {
- MessageQueue mq = *its;
- std::set<MessageQueue>::iterator itf = lockOKMQSet.find(mq);
- if (itf == lockOKMQSet.end())
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
- if (itt != m_processQueueTable.end())
- {
- itt->second->setLocked(false);
- RMQ_WARN("the message queue locked Failed, Group: {%s}, MQ: %s",
- m_consumerGroup.c_str(),
- mq.toString().c_str());
- }
- }
- }
- }
- catch (std::exception& e)
- {
- RMQ_ERROR("lockBatchMQ exception: %s", e.what());
- }
- }
- }
-}
-
-void RebalanceImpl::doRebalance()
-{
- std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
- std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
- for (; it != subTable.end(); it++)
- {
- std::string topic = it->first;
- try
- {
- rebalanceByTopic(topic);
- }
- catch (std::exception& e)
- {
- if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
- {
- RMQ_WARN("rebalanceByTopic Exception: %s", e.what());
- }
- }
- }
-
- truncateMessageQueueNotMyTopic();
-}
-
-std::map<std::string, SubscriptionData>& RebalanceImpl::getSubscriptionInner()
-{
- return m_subscriptionInner;
-}
-
-std::map<MessageQueue, ProcessQueue*>& RebalanceImpl::getProcessQueueTable()
-{
- return m_processQueueTable;
-}
-
-
-kpr::RWMutex& RebalanceImpl::getProcessQueueTableLock()
-{
- return m_processQueueTableLock;
-}
-
-
-std::map<std::string, std::set<MessageQueue> >& RebalanceImpl::getTopicSubscribeInfoTable()
-{
- return m_topicSubscribeInfoTable;
-}
-
-std::string& RebalanceImpl::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-void RebalanceImpl::setConsumerGroup(const std::string& consumerGroup)
-{
- m_consumerGroup = consumerGroup;
-}
-
-MessageModel RebalanceImpl::getMessageModel()
-{
- return m_messageModel;
-}
-
-void RebalanceImpl::setMessageModel(MessageModel messageModel)
-{
- m_messageModel = messageModel;
-}
-
-AllocateMessageQueueStrategy* RebalanceImpl::getAllocateMessageQueueStrategy()
-{
- return m_pAllocateMessageQueueStrategy;
-}
-
-void RebalanceImpl::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
-{
- m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
-}
-
-MQClientFactory* RebalanceImpl::getmQClientFactory()
-{
- return m_pMQClientFactory;
-}
-
-void RebalanceImpl::setmQClientFactory(MQClientFactory* pMQClientFactory)
-{
- m_pMQClientFactory = pMQClientFactory;
-}
-
-std::map<std::string, std::set<MessageQueue> > RebalanceImpl::buildProcessQueueTableByBrokerName()
-{
- std::map<std::string, std::set<MessageQueue> > result ;
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
- for (; it != m_processQueueTable.end();)
- {
- MessageQueue mq = it->first;
- std::map<std::string, std::set<MessageQueue> >::iterator itm = result.find(mq.getBrokerName());
- if (itm == result.end())
- {
- std::set<MessageQueue> mqs ;
- mqs.insert(mq);
- result[mq.getBrokerName()] = mqs;
- }
- else
- {
- itm->second.insert(mq);
- }
- }
-
- return result;
-}
-
-void RebalanceImpl::rebalanceByTopic(const std::string& topic)
-{
- RMQ_DEBUG("rebalanceByTopic begin, topic={%s}", topic.c_str());
- switch (m_messageModel)
- {
- case BROADCASTING:
- {
- //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
- std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
- if (it != m_topicSubscribeInfoTable.end())
- {
- std::set<MessageQueue> mqSet = it->second;
- bool changed = updateProcessQueueTableInRebalance(topic, mqSet);
- if (changed)
- {
- messageQueueChanged(topic, mqSet, mqSet);
- RMQ_INFO("messageQueueChanged {%s} {%s} {%s} {%s}",
- m_consumerGroup.c_str(),
- topic.c_str(),
- UtilAll::toString(mqSet).c_str(),
- UtilAll::toString(mqSet).c_str());
- }
- }
- else
- {
- RMQ_WARN("doRebalance, {%s}, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
- }
- break;
- }
- case CLUSTERING:
- {
- //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
- std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
- if (it == m_topicSubscribeInfoTable.end())
- {
- if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
- {
- RMQ_WARN("doRebalance, %s, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
- }
- }
-
- std::list<std::string> cidAll = m_pMQClientFactory->findConsumerIdList(topic, m_consumerGroup);
- if (cidAll.empty())
- {
- RMQ_WARN("doRebalance, %s:%s, get consumer id list failed.", m_consumerGroup.c_str(), topic.c_str());
- }
-
- if (it != m_topicSubscribeInfoTable.end() && !cidAll.empty())
- {
- std::vector<MessageQueue> mqAll;
- std::set<MessageQueue> mqSet = it->second;
- std::set<MessageQueue>::iterator its = mqSet.begin();
-
- for (; its != mqSet.end(); its++)
- {
- mqAll.push_back(*its);
- }
-
- cidAll.sort();
-
- AllocateMessageQueueStrategy* strategy = m_pAllocateMessageQueueStrategy;
-
- std::vector<MessageQueue>* allocateResult = NULL;
- try
- {
- allocateResult = strategy->allocate(m_consumerGroup,
- m_pMQClientFactory->getClientId(), mqAll, cidAll);
- }
- catch (std::exception& e)
- {
- RMQ_ERROR("AllocateMessageQueueStrategy.allocate Exception, allocateMessageQueueStrategyName={%s}, mqAll={%s}, cidAll={%s}, %s",
- strategy->getName().c_str(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str(), e.what());
- return;
- }
-
- std::set<MessageQueue> allocateResultSet;
- if (allocateResult != NULL)
- {
- for (size_t i = 0; i < allocateResult->size(); i++)
- {
- allocateResultSet.insert(allocateResult->at(i));
- }
-
- delete allocateResult;
- }
-
- bool changed = updateProcessQueueTableInRebalance(topic, allocateResultSet);
- if (changed)
- {
- RMQ_INFO("rebalanced result changed. allocateMessageQueueStrategyName={%s}, group={%s}, topic={%s}, ConsumerId={%s}, "
- "rebalanceSize={%u}, rebalanceMqSet={%s}, mqAllSize={%u}, cidAllSize={%u}, mqAll={%s}, cidAll={%s}",
- strategy->getName().c_str(), m_consumerGroup.c_str(), topic.c_str(), m_pMQClientFactory->getClientId().c_str(),
- (unsigned)allocateResultSet.size(), UtilAll::toString(allocateResultSet).c_str(),
- (unsigned)mqAll.size(), (unsigned)cidAll.size(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str()
- );
-
- messageQueueChanged(topic, mqSet, allocateResultSet);
- }
- }
- }
- break;
- default:
- break;
- }
- RMQ_DEBUG("rebalanceByTopic end");
-}
-
-
-void RebalanceImpl::removeProcessQueue(const MessageQueue& mq)
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.find(mq);
- if (it != m_processQueueTable.end())
- {
- MessageQueue mq = it->first;
- ProcessQueue* pq = it->second;
- bool isDroped = pq->isDropped();
-
- this->removeUnnecessaryMessageQueue(mq, *pq);
- RMQ_INFO("Fix Offset, {%s}, remove unnecessary mq, {%s} Droped: {%d}",
- m_consumerGroup.c_str(), mq.toString().c_str(), isDroped);
- }
-}
-
-
-bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet)
-{
- RMQ_DEBUG("updateProcessQueueTableInRebalance begin, topic={%s}", topic.c_str());
- bool changed = false;
-
- {
- kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
- for (; it != m_processQueueTable.end();)
- {
- std::map<MessageQueue, ProcessQueue*>::iterator itCur = it++;
- MessageQueue mq = itCur->first;
- ProcessQueue* pq = itCur->second;
- if (mq.getTopic() == topic)
- {
- std::set<MessageQueue>::iterator itMq = mqSet.find(mq);
- if (itMq == mqSet.end())
- {
- pq->setDropped(true);
- if (this->removeUnnecessaryMessageQueue(mq, *pq))
- {
- changed = true;
- m_processQueueTable.erase(itCur);
-
- RMQ_WARN("doRebalance, {%s}, remove unnecessary mq, {%s}",
- m_consumerGroup.c_str(), mq.toString().c_str());
- }
- }
- else if (pq->isPullExpired())
- {
- switch(this->consumeType())
- {
- case CONSUME_ACTIVELY:
- break;
- case CONSUME_PASSIVELY:
- pq->setDropped(true);
- if (this->removeUnnecessaryMessageQueue(mq, *pq))
- {
- changed = true;
- m_processQueueTable.erase(itCur);
-
- RMQ_ERROR("[BUG]doRebalance, {%s}, remove unnecessary mq, {%s}, because pull is pause, so try to fixed it",
- m_consumerGroup.c_str(), mq.toString().c_str());
- }
- break;
- default:
- break;
- }
- }
- }
- }
- }
-
- std::list<PullRequest*> pullRequestList;
- std::set<MessageQueue>::iterator its = mqSet.begin();
- for (; its != mqSet.end(); its++)
- {
- MessageQueue mq = *its;
- bool find = false;
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator itm = m_processQueueTable.find(mq);
- if (itm != m_processQueueTable.end())
- {
- find = true;
- }
- }
-
- if (!find)
- {
- //todo: memleak
- PullRequest* pullRequest = new PullRequest();
- pullRequest->setConsumerGroup(m_consumerGroup);
- pullRequest->setMessageQueue(mq);
- pullRequest->setProcessQueue(new ProcessQueue());//todo: memleak
-
- long long nextOffset = computePullFromWhere(mq);
- if (nextOffset >= 0)
- {
- pullRequest->setNextOffset(nextOffset);
- pullRequestList.push_back(pullRequest);
- changed = true;
-
- {
- kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
- m_processQueueTable[mq] = pullRequest->getProcessQueue();
- RMQ_INFO("doRebalance, {%s}, add a new mq, {%s}, pullRequst: %s",
- m_consumerGroup.c_str(), mq.toString().c_str(), pullRequest->toString().c_str());
- }
- }
- else
- {
- RMQ_WARN("doRebalance, {%s}, add new mq failed, {%s}",
- m_consumerGroup.c_str(), mq.toString().c_str());
- }
- }
- }
-
- //todo memleak
- dispatchPullRequest(pullRequestList);
- RMQ_DEBUG("updateProcessQueueTableInRebalance end");
-
- return changed;
-}
-
-void RebalanceImpl::truncateMessageQueueNotMyTopic()
-{
- std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
-
- kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
- std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
- for (; it != m_processQueueTable.end();)
- {
- MessageQueue mq = it->first;
- std::map<std::string, SubscriptionData>::iterator itt = subTable.find(mq.getTopic());
-
- if (itt == subTable.end())
- {
- ProcessQueue* pq = it->second;
- if (pq != NULL)
- {
- pq->setDropped(true);
- RMQ_WARN("doRebalance, {%s}, truncateMessageQueueNotMyTopic remove unnecessary mq, {%s}",
- m_consumerGroup.c_str(), mq.toString().c_str());
- }
- m_processQueueTable.erase(it++);
- }
- else
- {
- it++;
- }
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
deleted file mode 100755
index 577a031..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCEIMPL_H__
-#define __REBALANCEIMPL_H__
-
-#include <map>
-#include <string>
-#include <set>
-#include <list>
-
-#include "ConsumeType.h"
-#include "MessageQueue.h"
-#include "ProcessQueue.h"
-#include "PullRequest.h"
-#include "SubscriptionData.h"
-
-namespace rmq
-{
- class AllocateMessageQueueStrategy;
- class MQClientFactory;
-
- class RebalanceImpl
- {
- public:
- RebalanceImpl(const std::string& consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
- MQClientFactory* pMQClientFactory);
- virtual ~RebalanceImpl();
-
- virtual void messageQueueChanged(const std::string& topic,
- std::set<MessageQueue>& mqAll,
- std::set<MessageQueue>& mqDivided) = 0;
- virtual bool removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) = 0;
- virtual void dispatchPullRequest(std::list<PullRequest*>& pullRequestList) = 0;
- virtual long long computePullFromWhere(MessageQueue& mq) = 0;
- virtual ConsumeType consumeType() = 0;
-
- bool lock(MessageQueue& mq);
- void lockAll();
-
- void unlock(MessageQueue& mq, bool oneway);
- void unlockAll(bool oneway);
-
- void doRebalance();
-
- std::map<MessageQueue, ProcessQueue*>& getProcessQueueTable();
- kpr::RWMutex& getProcessQueueTableLock();
- std::map<std::string, SubscriptionData>& getSubscriptionInner();
- std::map<std::string, std::set<MessageQueue> >& getTopicSubscribeInfoTable();
-
- std::string& getConsumerGroup();
- void setConsumerGroup(const std::string& consumerGroup);
-
- MessageModel getMessageModel();
- void setMessageModel(MessageModel messageModel);
-
- AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
- void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
-
- MQClientFactory* getmQClientFactory();
- void setmQClientFactory(MQClientFactory* pMQClientFactory);
-
- void removeProcessQueue(const MessageQueue& mq);
-
- private:
- std::map<std::string, std::set<MessageQueue> > buildProcessQueueTableByBrokerName();
- void rebalanceByTopic(const std::string& topic);
- bool updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet);
- void truncateMessageQueueNotMyTopic();
-
- protected:
- std::map<MessageQueue, ProcessQueue*> m_processQueueTable;
- kpr::RWMutex m_processQueueTableLock;
-
- std::map<std::string, std::set<MessageQueue> > m_topicSubscribeInfoTable;
- kpr::Mutex m_topicSubscribeInfoTableLock;
-
- std::map<std::string, SubscriptionData> m_subscriptionInner;
- kpr::Mutex m_subscriptionInnerLock;
-
- std::string m_consumerGroup;
- MessageModel m_messageModel;
- AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy;
- MQClientFactory* m_pMQClientFactory;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
deleted file mode 100755
index 1aa287b..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalancePullImpl.h"
-#include "DefaultMQPullConsumerImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MessageQueueListener.h"
-#include "OffsetStore.h"
-#include "DefaultMQPullConsumer.h"
-
-namespace rmq
-{
-
-RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
- : RebalanceImpl("", BROADCASTING, NULL, NULL),
- m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
-{
-}
-
-RebalancePullImpl::RebalancePullImpl(const std::string& consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
- MQClientFactory* pMQClientFactory,
- DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
- : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
- m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
-{
-}
-
-long long RebalancePullImpl::computePullFromWhere(MessageQueue& mq)
-{
- return 0;
-}
-
-void RebalancePullImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
-{
-}
-
-void RebalancePullImpl::messageQueueChanged(const std::string& topic,
- std::set<MessageQueue>& mqAll,
- std::set<MessageQueue>& mqDivided)
-{
- MessageQueueListener* messageQueueListener =
- m_pDefaultMQPullConsumerImpl->getDefaultMQPullConsumer()->getMessageQueueListener();
- if (messageQueueListener != NULL)
- {
- try
- {
- messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided);
- }
- catch (...)
- {
- RMQ_ERROR("messageQueueChanged exception, %s", topic.c_str());
- }
- }
-}
-
-bool RebalancePullImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
-{
- m_pDefaultMQPullConsumerImpl->getOffsetStore()->persist(mq);
- m_pDefaultMQPullConsumerImpl->getOffsetStore()->removeOffset(mq);
- return true;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
deleted file mode 100755
index 46dbcd1..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCEPULLIMPL_H__
-#define __REBALANCEPULLIMPL_H__
-
-#include "RebalanceImpl.h"
-
-namespace rmq
-{
-class DefaultMQPullConsumerImpl;
-
-class RebalancePullImpl : public RebalanceImpl
-{
-public:
- RebalancePullImpl(DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
-
- RebalancePullImpl(const std::string &consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
- MQClientFactory *pMQClientFactory,
- DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
-
- long long computePullFromWhere(MessageQueue &mq);
-
- void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
-
- void messageQueueChanged(const std::string &topic,
- std::set<MessageQueue> &mqAll,
- std::set<MessageQueue> &mqDivided);
-
- bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
-
- ConsumeType consumeType()
- {
- return CONSUME_ACTIVELY;
- };
-
-private:
- DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
deleted file mode 100755
index fde770d..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalancePushImpl.h"
-
-#include <string.h>
-#include <limits.h>
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientFactory.h"
-#include "MessageQueueListener.h"
-#include "OffsetStore.h"
-#include "DefaultMQPushConsumer.h"
-#include "MQAdminImpl.h"
-
-
-namespace rmq
-{
-
-RebalancePushImpl::RebalancePushImpl(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
- : RebalanceImpl("", BROADCASTING, NULL, NULL),
- m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
-{
-}
-
-RebalancePushImpl::RebalancePushImpl(const std::string& consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
- MQClientFactory* pMQClientFactory,
- DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
- : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
- m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
-{
-}
-
-void RebalancePushImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
-{
- std::list<PullRequest*>::iterator it = pullRequestList.begin();
- for (; it != pullRequestList.end(); it++)
- {
- m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(*it);
- RMQ_INFO("doRebalance, {%s}, add a new pull request {%s}",
- m_consumerGroup.c_str(), (*it)->toString().c_str());
- }
-}
-
-long long RebalancePushImpl::computePullFromWhere(MessageQueue& mq)
-{
- long long result = -1;
- ConsumeFromWhere consumeFromWhere =
- m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeFromWhere();
- OffsetStore* offsetStore = m_pDefaultMQPushConsumerImpl->getOffsetStore();
-
- switch (consumeFromWhere)
- {
- case CONSUME_FROM_FIRST_OFFSET:
- {
- long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
- if (lastOffset >= 0)
- {
- result = lastOffset;
- }
- else if (-1 == lastOffset)
- {
- result = 0L;
- }
- else
- {
- result = -1;
- }
- break;
- }
- case CONSUME_FROM_LAST_OFFSET:
- {
- long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
- if (lastOffset >= 0)
- {
- result = lastOffset;
- }
- else if (-1 == lastOffset)
- {
- if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
- {
- result = 0L;
- }
- else
- {
- //result = LLONG_MAX;
- try
- {
- result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
- }
- catch(...)
- {
- result = -1;
- }
- }
- }
- else
- {
- result = -1;
- }
- break;
- }
-
- case CONSUME_FROM_MAX_OFFSET:
- result = LLONG_MAX;
- break;
- case CONSUME_FROM_MIN_OFFSET:
- result = 0L;
- break;
- case CONSUME_FROM_TIMESTAMP:
- {
- long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
- if (lastOffset >= 0)
- {
- result = lastOffset;
- }
- else if (-1 == lastOffset)
- {
- if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
- {
- //result = LLONG_MAX;
- try
- {
- result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
- }
- catch(...)
- {
- result = -1;
- }
- }
- else
- {
- try
- {
- long timestamp = UtilAll::str2tm(
- m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeTimestamp(),
- rmq::yyyyMMddHHmmss);
- result = m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
- }
- catch(...)
- {
- result = -1;
- }
- }
- }
- else
- {
- result = -1;
- }
- break;
- }
- break;
- default:
- break;
- }
-
- return result;
-}
-
-void RebalancePushImpl::messageQueueChanged(const std::string& topic,
- std::set<MessageQueue>& mqAll,
- std::set<MessageQueue>& mqDivided)
-{
-}
-
-
-bool RebalancePushImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
-{
- m_pDefaultMQPushConsumerImpl->getOffsetStore()->persist(mq);
- m_pDefaultMQPushConsumerImpl->getOffsetStore()->removeOffset(mq);
- if (m_pDefaultMQPushConsumerImpl->isConsumeOrderly()
- && m_pDefaultMQPushConsumerImpl->messageModel() == CLUSTERING)
- {
- if (pq.getLockConsume().TryLock(1000))
- {
- try
- {
- this->unlock(mq, true);
- }
- catch (std::exception& e)
- {
- RMQ_ERROR("removeUnnecessaryMessageQueue Exception: %s", e.what());
- }
- pq.getLockConsume().Unlock();
- }
- else
- {
- RMQ_WARN("[WRONG]mq is consuming, so can not unlock it, MQ:%s, maybe hanged for a while, times:{%lld}",
- mq.toString().c_str(),
- pq.getTryUnlockTimes());
-
- pq.incTryUnlockTimes();
- }
-
- return false;
- }
-
- return true;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
deleted file mode 100755
index 0aa2b0e..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __REBALANCEPUSHIMPL_H__
-#define __REBALANCEPUSHIMPL_H__
-
-#include "RebalanceImpl.h"
-
-namespace rmq
-{
-class DefaultMQPushConsumerImpl;
-
-class RebalancePushImpl : public RebalanceImpl
-{
-public:
- RebalancePushImpl(DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
-
- RebalancePushImpl(const std::string &consumerGroup,
- MessageModel messageModel,
- AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
- MQClientFactory *pMQClientFactory,
- DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
-
- void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
- long long computePullFromWhere(MessageQueue &mq);
- void messageQueueChanged(const std::string &topic,
- std::set<MessageQueue> &mqAll,
- std::set<MessageQueue> &mqDivided);
- bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
-
-
- ConsumeType consumeType()
- {
- return CONSUME_PASSIVELY;
- };
-
-private:
- DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
deleted file mode 100644
index 013fefb..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RebalanceService.h"
-#include "MQClientFactory.h"
-
-namespace rmq
-{
-
-long RebalanceService::s_WaitInterval = 1000 * 10;
-
-RebalanceService::RebalanceService(MQClientFactory* pMQClientFactory)
- : ServiceThread("RebalanceService"),
- m_pMQClientFactory(pMQClientFactory)
-{
-}
-
-
-RebalanceService::~RebalanceService()
-{
-
-}
-
-void RebalanceService::Run()
-{
- RMQ_INFO("%s service started", getServiceName().c_str());
-
- while (!m_stoped)
- {
- waitForRunning(s_WaitInterval);
- m_pMQClientFactory->doRebalance();
- }
-
- RMQ_INFO("%s service end", getServiceName().c_str());
-}
-
-std::string RebalanceService::getServiceName()
-{
- return "RebalanceService";
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RebalanceService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.h b/rocketmq-client4cpp/src/consumer/RebalanceService.h
deleted file mode 100755
index ef4d746..0000000
--- a/rocketmq-client4cpp/src/consumer/RebalanceService.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REBALANCESERVICE_H__
-#define __REBALANCESERVICE_H__
-
-#include "ServiceThread.h"
-
-namespace rmq
-{
- class MQClientFactory;
-
- /**
- * Rebalance service
- *
- */
- class RebalanceService : public ServiceThread
- {
- public:
- RebalanceService(MQClientFactory* pMQClientFactory);
- ~RebalanceService();
-
- void Run();
- std::string getServiceName();
-
- private:
- MQClientFactory* m_pMQClientFactory;
- static long s_WaitInterval;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
deleted file mode 100755
index 1c4fd23..0000000
--- a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "RemoteBrokerOffsetStore.h"
-#include "MQClientFactory.h"
-#include "ScopedLock.h"
-#include "MQClientException.h"
-#include "CommandCustomHeader.h"
-#include "MQClientAPIImpl.h"
-
-namespace rmq
-{
-
-RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName)
-{
- m_pMQClientFactory = pMQClientFactory;
- m_groupName = groupName;
-}
-
-void RemoteBrokerOffsetStore::load()
-{
-
-}
-
-void RemoteBrokerOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)
-{
- kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
- typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
- if (it == m_offsetTable.end())
- {
- m_offsetTable[mq] = offset;
- it = m_offsetTable.find(mq);
- }
-
- kpr::AtomicLong& offsetOld = it->second;
- if (increaseOnly)
- {
- MixAll::compareAndIncreaseOnly(offsetOld, offset);
- }
- else
- {
- offsetOld.set(offset);
- }
-}
-
-long long RemoteBrokerOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type)
-{
- RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
- switch (type)
- {
- case MEMORY_FIRST_THEN_STORE:
- case READ_FROM_MEMORY:
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
- typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
- if (it != m_offsetTable.end())
- {
- return it->second.get();
- }
- else if (READ_FROM_MEMORY == type)
- {
- RMQ_DEBUG("No offset in memory, MQ:%s", mq.toString().c_str());
- return -1;
- }
- }
- case READ_FROM_STORE:
- {
- try
- {
- long long brokerOffset = this->fetchConsumeOffsetFromBroker(mq);
- RMQ_DEBUG("fetchConsumeOffsetFromBroker, MQ:%s, brokerOffset:%lld",
- mq.toString().c_str(), brokerOffset);
- if (brokerOffset >= 0)
- {
- this->updateOffset(mq, brokerOffset, false);
- }
- return brokerOffset;
- }
- // No offset in broker
- catch (const MQBrokerException& e)
- {
- RMQ_WARN("No offset in broker, MQ:%s, exception:%s", mq.toString().c_str(), e.what());
- return -1;
- }
- catch (const std::exception& e)
- {
- RMQ_ERROR("fetchConsumeOffsetFromBroker exception, MQ:%s, msg:%s",
- mq.toString().c_str(), e.what());
- return -2;
- }
- catch (...)
- {
- RMQ_ERROR("fetchConsumeOffsetFromBroker unknow exception, MQ:%s",
- mq.toString().c_str());
- return -2;
- }
- }
- default:
- break;
- }
-
- return -1;
-}
-
-void RemoteBrokerOffsetStore::persistAll(std::set<MessageQueue>& mqs)
-{
- if (mqs.empty())
- {
- return;
- }
-
- std::set<MessageQueue> unusedMQ;
- long long times = m_storeTimesTotal.fetchAndAdd(1);
-
- kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
- for (typeof(m_offsetTable.begin()) it = m_offsetTable.begin();
- it != m_offsetTable.end(); it++)
- {
- MessageQueue mq = it->first;
- kpr::AtomicLong& offset = it->second;
- if (mqs.find(mq) != mqs.end())
- {
- try
- {
- this->updateConsumeOffsetToBroker(mq, offset.get());
- if ((times % 12) == 0)
- {
- RMQ_INFO("updateConsumeOffsetToBroker, Group: {%s} ClientId: {%s} mq:{%s} offset {%llu}",
- m_groupName.c_str(),
- m_pMQClientFactory->getClientId().c_str(),
- mq.toString().c_str(),
- offset.get());
- }
- }
- catch (...)
- {
- RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
- }
- }
- else
- {
- unusedMQ.insert(mq);
- }
- }
-
- if (!unusedMQ.empty())
- {
- for (typeof(unusedMQ.begin()) it = unusedMQ.begin(); it != unusedMQ.end(); it++)
- {
- m_offsetTable.erase(*it);
- RMQ_INFO("remove unused mq, %s, %s", it->toString().c_str(), m_groupName.c_str());
- }
- }
-}
-
-void RemoteBrokerOffsetStore::persist(const MessageQueue& mq)
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
- typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
- if (it != m_offsetTable.end())
- {
- try
- {
- this->updateConsumeOffsetToBroker(mq, it->second.get());
- RMQ_DEBUG("updateConsumeOffsetToBroker ok, mq=%s, offset=%lld", mq.toString().c_str(), it->second.get());
- }
- catch (...)
- {
- RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
- }
- }
-}
-
-void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq)
-{
- kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
- m_offsetTable.erase(mq);
- RMQ_INFO("remove unnecessary messageQueue offset. mq=%s, offsetTableSize=%u",
- mq.toString().c_str(), (unsigned)m_offsetTable.size());
-}
-
-
-std::map<MessageQueue, long long> RemoteBrokerOffsetStore::cloneOffsetTable(const std::string& topic)
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
- std::map<MessageQueue, long long> cloneOffsetTable;
- RMQ_FOR_EACH(m_offsetTable, it)
- {
- MessageQueue mq = it->first;
- kpr::AtomicLong& offset = it->second;
- if (topic == mq.getTopic())
- {
- cloneOffsetTable[mq] = offset.get();
- }
- }
-
- return cloneOffsetTable;
-}
-
-
-void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset)
-{
- FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
- if (findBrokerResult.brokerAddr.empty())
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
- }
-
- if (!findBrokerResult.brokerAddr.empty())
- {
- UpdateConsumerOffsetRequestHeader* requestHeader = new UpdateConsumerOffsetRequestHeader();
- requestHeader->topic = mq.getTopic();
- requestHeader->consumerGroup = this->m_groupName;
- requestHeader->queueId = mq.getQueueId();
- requestHeader->commitOffset = offset;
-
- m_pMQClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
- findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
- }
- else
- {
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
- }
-}
-
-long long RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq)
-{
- FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
- if (findBrokerResult.brokerAddr.empty())
- {
- // TODO Here may be heavily overhead for Name Server,need tuning
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
- }
-
- if (!findBrokerResult.brokerAddr.empty())
- {
- QueryConsumerOffsetRequestHeader* requestHeader = new QueryConsumerOffsetRequestHeader();
- requestHeader->topic = mq.getTopic();
- requestHeader->consumerGroup = this->m_groupName;
- requestHeader->queueId = mq.getQueueId();
-
- return m_pMQClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
- findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
- }
- else
- {
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
deleted file mode 100755
index b613084..0000000
--- a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __REMOTEBROKEROFFSETSTORE_H__
-#define __REMOTEBROKEROFFSETSTORE_H__
-
-#include "OffsetStore.h"
-#include <map>
-#include <string>
-#include <set>
-#include "MessageQueue.h"
-#include "AtomicValue.h"
-#include "Mutex.h"
-
-namespace rmq
-{
- class MQClientFactory;
-
- /**
- * offset remote store
- *
- */
- class RemoteBrokerOffsetStore : public OffsetStore
- {
- public:
- RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) ;
-
- void load();
- void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly);
- long long readOffset(const MessageQueue& mq, ReadOffsetType type);
- void persistAll(std::set<MessageQueue>& mqs);
- void persist(const MessageQueue& mq);
- void removeOffset(const MessageQueue& mq) ;
- std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic);
-
- private:
- void updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset);
- long long fetchConsumeOffsetFromBroker(const MessageQueue& mq);
-
- private:
- MQClientFactory* m_pMQClientFactory;
- std::string m_groupName;
- kpr::AtomicInteger m_storeTimesTotal;
- std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
- kpr::RWMutex m_tableMutex;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
deleted file mode 100755
index ed5cf12..0000000
--- a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "SubscriptionData.h"
-
-#include <sstream>
-#include "KPRUtil.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-std::string SubscriptionData::SUB_ALL = "*";
-
-SubscriptionData::SubscriptionData()
-{
- m_subVersion = KPRUtil::GetCurrentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
- : m_topic(topic),
- m_subString(subString)
-{
- m_subVersion = KPRUtil::GetCurrentTimeMillis();
-}
-
-std::string SubscriptionData::getTopic()const
-{
- return m_topic;
-}
-
-void SubscriptionData::setTopic(const std::string& topic)
-{
- m_topic = topic;
-}
-
-std::string SubscriptionData::getSubString()
-{
- return m_subString;
-}
-
-void SubscriptionData::setSubString(const std::string& subString)
-{
- m_subString = subString;
-}
-
-std::set<std::string>& SubscriptionData::getTagsSet()
-{
- return m_tagsSet;
-}
-
-void SubscriptionData::setTagsSet(const std::set<std::string>& tagsSet)
-{
- m_tagsSet = tagsSet;
-}
-
-long long SubscriptionData::getSubVersion()
-{
- return m_subVersion;
-}
-
-void SubscriptionData::setSubVersion(long long subVersion)
-{
- m_subVersion = subVersion;
-}
-
-std::set<int>& SubscriptionData::getCodeSet()
-{
- return m_codeSet;
-}
-
-void SubscriptionData::setCodeSet(const std::set<int>& codeSet)
-{
- m_codeSet = codeSet;
-}
-
-int SubscriptionData::hashCode()
-{
- /*
- final int prime = 31;
- int result = 1;
- result = prime * result + (classFilterMode ? 1231 : 1237);
- result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
- result = prime * result + ((subString == null) ? 0 : subString.hashCode());
- result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- return result;
- */
- std::stringstream ss;
- ss << UtilAll::hashCode(m_codeSet)
- << m_subString
- << UtilAll::hashCode(m_tagsSet)
- << m_topic;
- return UtilAll::hashCode(ss.str());
-}
-
-
-
-
-bool SubscriptionData::operator==(const SubscriptionData& other)
-{
- if (m_codeSet != other.m_codeSet)
- {
- return false;
- }
-
- if (m_subString != other.m_subString)
- {
- return false;
- }
-
- if (m_subVersion != other.m_subVersion)
- {
- return false;
- }
-
- if (m_tagsSet != other.m_tagsSet)
- {
- return false;
- }
-
- if (m_topic != other.m_topic)
- {
- return false;
- }
-
- return true;
-}
-
-bool SubscriptionData::operator<(const SubscriptionData& other)const
-{
- if (m_topic < other.m_topic)
- {
- return true;
- }
- else if (m_topic == other.m_topic)
- {
- if (m_subString < other.m_subString)
- {
- return true;
- }
- else
- {
- return false;
- }
- }
- else
- {
- return false;
- }
-}
-
-void SubscriptionData::toJson(Json::Value& obj) const
-{
- obj["classFilterMode"] = false;
- obj["topic"] = m_topic;
- obj["subString"] = m_subString;
- obj["subVersion"] = (long long)m_subVersion;
-
- Json::Value tagSet(Json::arrayValue);
- RMQ_FOR_EACH(m_tagsSet, it)
- {
- tagSet.append(*it);
- }
- obj["tagsSet"] = tagSet;
-
- Json::Value codeSet(Json::arrayValue);
- RMQ_FOR_EACH(m_codeSet, it)
- {
- codeSet.append(*it);
- }
- obj["codeSet"] = codeSet;
-}
-
-std::string SubscriptionData::toString() const
-{
- std::stringstream ss;
- ss << "{classFilterMode=" << false
- << ",topic=" << m_topic
- << ",subString=" << m_subString
- << ",subVersion=" << m_subVersion
- << ",tagsSet=" << UtilAll::toString(m_tagsSet)
- << ",codeSet=" << UtilAll::toString(m_codeSet)
- << "}";
- return ss.str();
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.h b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
deleted file mode 100755
index 4796fb7..0000000
--- a/rocketmq-client4cpp/src/consumer/SubscriptionData.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __SUBSCRIPTIONDATA_H__
-#define __SUBSCRIPTIONDATA_H__
-
-#include <string>
-#include <set>
-
-#include "RocketMQClient.h"
-#include "RemotingSerializable.h"
-#include "RefHandle.h"
-#include "json/json.h"
-
-namespace rmq
-{
- class SubscriptionData : public kpr::RefCount
- {
- public:
- SubscriptionData();
- SubscriptionData(const std::string& topic, const std::string& subString);
-
- std::string getTopic()const;
- void setTopic(const std::string& topic);
-
- std::string getSubString();
- void setSubString(const std::string& subString);
-
- std::set<std::string>& getTagsSet();
- void setTagsSet(const std::set<std::string>& tagsSet);
-
- long long getSubVersion();
- void setSubVersion(long long subVersion);
-
- std::set<int>& getCodeSet();
- void setCodeSet(const std::set<int>& codeSet);
-
- int hashCode();
- void toJson(Json::Value& obj) const;
- std::string toString() const;
-
- bool operator==(const SubscriptionData& other);
- bool operator<(const SubscriptionData& other)const;
-
- public:
- static std::string SUB_ALL;
-
- private:
- std::string m_topic;
- std::string m_subString;
- std::set<std::string> m_tagsSet;
- std::set<int> m_codeSet;
- long long m_subVersion ;
- };
- typedef kpr::RefHandleT<SubscriptionData> SubscriptionDataPtr;
-
- inline std::ostream& operator<<(std::ostream& os, const SubscriptionData& obj)
- {
- os << obj.toString();
- return os;
- }
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/AUTHORS b/rocketmq-client4cpp/src/jsoncpp/AUTHORS
deleted file mode 100755
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/LICENSE b/rocketmq-client4cpp/src/jsoncpp/LICENSE
deleted file mode 100755
index 403d096..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/LICENSE
+++ /dev/null
@@ -1 +0,0 @@
-The json-cpp library and this documentation are in Public Domain.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/README.txt
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/README.txt b/rocketmq-client4cpp/src/jsoncpp/README.txt
deleted file mode 100755
index 379d376..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/README.txt
+++ /dev/null
@@ -1,117 +0,0 @@
-* Introduction:
- =============
-
-JSON (JavaScript Object Notation) is a lightweight data-interchange format.
-It can represent integer, real number, string, an ordered sequence of
-value, and a collection of name/value pairs.
-
-JsonCpp is a simple API to manipulate JSON value, handle serialization
-and unserialization to string.
-
-It can also preserve existing comment in unserialization/serialization steps,
-making it a convenient format to store user input files.
-
-Unserialization parsing is user friendly and provides precise error reports.
-
-
-* Building/Testing:
- =================
-
-JsonCpp uses Scons (http://www.scons.org) as a build system. Scons requires
-python to be installed (http://www.python.org).
-
-You download scons-local distribution from the following url:
-http://sourceforge.net/project/showfiles.php?group_id=30337&package_id=67375
-
-Unzip it in the directory where you found this README file. scons.py Should be
-at the same level as README.
-
-python scons.py platform=PLTFRM [TARGET]
-where PLTFRM may be one of:
- suncc Sun C++ (Solaris)
- vacpp Visual Age C++ (AIX)
- mingw
- msvc6 Microsoft Visual Studio 6 service pack 5-6
- msvc70 Microsoft Visual Studio 2002
- msvc71 Microsoft Visual Studio 2003
- msvc80 Microsoft Visual Studio 2005
- linux-gcc Gnu C++ (linux, also reported to work for Mac OS X)
-
-adding platform is fairly simple. You need to change the Sconstruct file
-to do so.
-
-and TARGET may be:
- check: build library and run unit tests.
-
-
-* Running the test manually:
- ==========================
-
-cd test
-# This will run the Reader/Writer tests
-python runjsontests.py "path to jsontest.exe"
-
-# This will run the Reader/Writer tests, using JSONChecker test suite
-# (http://www.json.org/JSON_checker/).
-# Notes: not all tests pass: JsonCpp is too lenient (for example,
-# it allows an integer to start with '0'). The goal is to improve
-# strict mode parsing to get all tests to pass.
-python runjsontests.py --with-json-checker "path to jsontest.exe"
-
-# This will run the unit tests (mostly Value)
-python rununittests.py "path to test_lib_json.exe"
-
-You can run the tests using valgrind:
-python rununittests.py --valgrind "path to test_lib_json.exe"
-
-
-* Building the documentation:
- ===========================
-
-Run the python script doxybuild.py from the top directory:
-
-python doxybuild.py --open --with-dot
-
-See doxybuild.py --help for options.
-
-
-* Adding a reader/writer test:
- ============================
-
-To add a test, you need to create two files in test/data:
-- a TESTNAME.json file, that contains the input document in JSON format.
-- a TESTNAME.expected file, that contains a flatened representation of
- the input document.
-
-TESTNAME.expected file format:
-- each line represents a JSON element of the element tree represented
- by the input document.
-- each line has two parts: the path to access the element separated from
- the element value by '='. Array and object values are always empty
- (e.g. represented by either [] or {}).
-- element path: '.' represented the root element, and is used to separate
- object members. [N] is used to specify the value of an array element
- at index N.
-See test_complex_01.json and test_complex_01.expected to better understand
-element path.
-
-
-* Understanding reader/writer test output:
- ========================================
-
-When a test is run, output files are generated aside the input test files.
-Below is a short description of the content of each file:
-
-- test_complex_01.json: input JSON document
-- test_complex_01.expected: flattened JSON element tree used to check if
- parsing was corrected.
-
-- test_complex_01.actual: flattened JSON element tree produced by
- jsontest.exe from reading test_complex_01.json
-- test_complex_01.rewrite: JSON document written by jsontest.exe using the
- Json::Value parsed from test_complex_01.json and serialized using
- Json::StyledWritter.
-- test_complex_01.actual-rewrite: flattened JSON element tree produced by
- jsontest.exe from reading test_complex_01.rewrite.
-test_complex_01.process-output: jsontest.exe output, typically useful to
- understand parsing error.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
deleted file mode 100755
index 1235a3e..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef CPPTL_JSON_ALLOCATOR_H_INCLUDED
-#define CPPTL_JSON_ALLOCATOR_H_INCLUDED
-
-#include <cstring>
-#include <memory>
-
-namespace rmq {
-namespace Json {
-template<typename T>
-class SecureAllocator {
- public:
- // Type definitions
- using value_type = T;
- using pointer = T*;
- using const_pointer = const T*;
- using reference = T&;
- using const_reference = const T&;
- using size_type = std::size_t;
- using difference_type = std::ptrdiff_t;
-
- /**
- * Allocate memory for N items using the standard allocator.
- */
- pointer allocate(size_type n) {
- // allocate using "global operator new"
- return static_cast<pointer>(::operator new(n * sizeof(T)));
- }
-
- /**
- * Release memory which was allocated for N items at pointer P.
- *
- * The memory block is filled with zeroes before being released.
- * The pointer argument is tagged as "volatile" to prevent the
- * compiler optimizing out this critical step.
- */
- void deallocate(volatile pointer p, size_type n) {
- std::memset(p, 0, n * sizeof(T));
- // free using "global operator delete"
- ::operator delete(p);
- }
-
- /**
- * Construct an item in-place at pointer P.
- */
- template<typename... Args>
- void construct(pointer p, Args&&... args) {
- // construct using "placement new" and "perfect forwarding"
- ::new (static_cast<void*>(p)) T(std::forward<Args>(args)...);
- }
-
- size_type max_size() const {
- return size_t(-1) / sizeof(T);
- }
-
- pointer address( reference x ) const {
- return std::addressof(x);
- }
-
- const_pointer address( const_reference x ) const {
- return std::addressof(x);
- }
-
- /**
- * Destroy an item in-place at pointer P.
- */
- void destroy(pointer p) {
- // destroy using "explicit destructor"
- p->~T();
- }
-
- // Boilerplate
- SecureAllocator() {}
- template<typename U> SecureAllocator(const SecureAllocator<U>&) {}
- template<typename U> struct rebind { using other = SecureAllocator<U>; };
-};
-
-
-template<typename T, typename U>
-bool operator==(const SecureAllocator<T>&, const SecureAllocator<U>&) {
- return true;
-}
-
-template<typename T, typename U>
-bool operator!=(const SecureAllocator<T>&, const SecureAllocator<U>&) {
- return false;
-}
-
-} //namespace Json
-} //namespace rmq
-
-#endif // CPPTL_JSON_ALLOCATOR_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
deleted file mode 100755
index dc67b27..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef CPPTL_JSON_ASSERTIONS_H_INCLUDED
-#define CPPTL_JSON_ASSERTIONS_H_INCLUDED
-
-#include <stdlib.h>
-#include <sstream>
-
-#if !defined(JSON_IS_AMALGAMATION)
-#include "config.h"
-#endif // if !defined(JSON_IS_AMALGAMATION)
-
-/** It should not be possible for a maliciously designed file to
- * cause an abort() or seg-fault, so these macros are used only
- * for pre-condition violations and internal logic errors.
- */
-#if JSON_USE_EXCEPTION
-
-// @todo <= add detail about condition in exception
-# define JSON_ASSERT(condition) \
- {if (!(condition)) {rmq::Json::throwLogicError( "assert json failed" );}}
-
-# define JSON_FAIL_MESSAGE(message) \
- { \
- JSONCPP_OSTRINGSTREAM oss; oss << message; \
- rmq::Json::throwLogicError(oss.str()); \
- abort(); \
- }
-
-#else // JSON_USE_EXCEPTION
-
-# define JSON_ASSERT(condition) assert(condition)
-
-// The call to assert() will show the failure message in debug builds. In
-// release builds we abort, for a core-dump or debugger.
-# define JSON_FAIL_MESSAGE(message) \
- { \
- JSONCPP_OSTRINGSTREAM oss; oss << message; \
- assert(false && oss.str().c_str()); \
- abort(); \
- }
-
-
-#endif
-
-#define JSON_ASSERT_MESSAGE(condition, message) \
- if (!(condition)) { \
- JSON_FAIL_MESSAGE(message); \
- }
-
-#endif // CPPTL_JSON_ASSERTIONS_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
deleted file mode 100644
index 6fcc8af..0000000
--- a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2007-2010 Baptiste Lepilleur
-// Distributed under MIT license, or public domain if desired and
-// recognized in your jurisdiction.
-// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
-
-#ifndef JSON_AUTOLINK_H_INCLUDED
-#define JSON_AUTOLINK_H_INCLUDED
-
-#include "config.h"
-
-#ifdef JSON_IN_CPPTL
-#include <cpptl/cpptl_autolink.h>
-#endif
-
-#if !defined(JSON_NO_AUTOLINK) && !defined(JSON_DLL_BUILD) && \
- !defined(JSON_IN_CPPTL)
-#define CPPTL_AUTOLINK_NAME "json"
-#undef CPPTL_AUTOLINK_DLL
-#ifdef JSON_DLL
-#define CPPTL_AUTOLINK_DLL
-#endif
-#include "autolink.h"
-#endif
-
-#endif // JSON_AUTOLINK_H_INCLUDED