You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/18 06:25:25 UTC

[rocketmq-client-cpp] branch master updated: [ISSUE #89] it will crash when starting orderly push consumer. (#108)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new a42078d  [ISSUE #89] it will crash when starting orderly push consumer. (#108)
a42078d is described below

commit a42078df3d538f88e32539f537c5d5a3c17aaf4c
Author: donggang123 <jo...@163.com>
AuthorDate: Mon Mar 18 14:25:21 2019 +0800

    [ISSUE #89] it will crash when starting orderly push consumer. (#108)
    
    [ISSUE #89] it will crash when starting orderly push consumer. (#108)
---
 src/consumer/Rebalance.cpp | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 3460457..b4e6360 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -275,6 +275,10 @@ void Rebalance::unlockAll(bool oneway) {
     unique_ptr<FindBrokerResult> pFindBrokerResult(
         m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID,
                                                        true));
+    if (!pFindBrokerResult) {
+      LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
+      continue;
+    }
     unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
         new UnlockBatchRequestBody());
     vector<MQMessageQueue> mqs(*(itb->second));
@@ -307,6 +311,10 @@ void Rebalance::unlock(MQMessageQueue mq) {
   unique_ptr<FindBrokerResult> pFindBrokerResult(
       m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                      MASTER_ID, true));
+  if (!pFindBrokerResult) {
+    LOG_ERROR("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+    return;
+  }
   unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
       new UnlockBatchRequestBody());
   vector<MQMessageQueue> mqs;
@@ -352,9 +360,14 @@ void Rebalance::lockAll() {
   LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
   for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
        itb != brokerMqs.end(); ++itb) {
+    string brokerName = (*(itb->second))[0].getBrokerName();
     unique_ptr<FindBrokerResult> pFindBrokerResult(
         m_pClientFactory->findBrokerAddressInSubscribe(
-            (*(itb->second))[0].getBrokerName(), MASTER_ID, true));
+                brokerName, MASTER_ID, true));
+    if (!pFindBrokerResult) {
+      LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
+      continue;
+    }
     unique_ptr<LockBatchRequestBody> lockBatchRequest(
         new LockBatchRequestBody());
     lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
@@ -391,6 +404,10 @@ bool Rebalance::lock(MQMessageQueue mq) {
   unique_ptr<FindBrokerResult> pFindBrokerResult(
       m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                      MASTER_ID, true));
+  if (!pFindBrokerResult) {
+    LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+    return false;
+  }
   unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
   lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
   lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());