You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/18 06:25:25 UTC
[rocketmq-client-cpp] branch master updated: [ISSUE #89] it will
crash when starting orderly push consumer. (#108)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new a42078d [ISSUE #89] it will crash when starting orderly push consumer. (#108)
a42078d is described below
commit a42078df3d538f88e32539f537c5d5a3c17aaf4c
Author: donggang123 <jo...@163.com>
AuthorDate: Mon Mar 18 14:25:21 2019 +0800
[ISSUE #89] it will crash when starting orderly push consumer. (#108)
[ISSUE #89] it will crash when starting orderly push consumer. (#108)
---
src/consumer/Rebalance.cpp | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 3460457..b4e6360 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -275,6 +275,10 @@ void Rebalance::unlockAll(bool oneway) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID,
true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
+ continue;
+ }
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs(*(itb->second));
@@ -307,6 +311,10 @@ void Rebalance::unlock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+ return;
+ }
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs;
@@ -352,9 +360,14 @@ void Rebalance::lockAll() {
LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
itb != brokerMqs.end(); ++itb) {
+ string brokerName = (*(itb->second))[0].getBrokerName();
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(
- (*(itb->second))[0].getBrokerName(), MASTER_ID, true));
+ brokerName, MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
+ continue;
+ }
unique_ptr<LockBatchRequestBody> lockBatchRequest(
new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
@@ -391,6 +404,10 @@ bool Rebalance::lock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+ return false;
+ }
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());