You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/28 06:17:48 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-306]Raise Nullpointer Exception when create tubemq instance (#227)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new b951173  [TUBEMQ-306]Raise Nullpointer Exception when create tubemq instance (#227)
b951173 is described below

commit b951173d2408a39136cbda70346675db0b17d3b6
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jul 28 06:17:41 2020 +0000

    [TUBEMQ-306]Raise Nullpointer Exception when create tubemq instance (#227)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../client/consumer/BaseMessageConsumer.java       | 133 ++++++++++++---------
 1 file changed, 74 insertions(+), 59 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 8267feb..362d389 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -563,35 +563,32 @@ public class BaseMessageConsumer implements MessageConsumer {
                 ClientMaster.RegisterResponseM2C response =
                         masterService.consumerRegisterC2M(createMasterRegisterRequest(),
                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
-                if (response.getSuccess()) {
+                if (response != null && response.getSuccess()) {
                     processRegisterAllocAndRspFlowRules(response);
                     processRegAuthorizedToken(response);
                     break;
                 }
-                if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
-                    logger.error(strBuffer
-                            .append("[Register Failed] ConsumeGroup forbidden, register to master failed! ")
-                            .append(response.getErrMsg()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                    throw new TubeClientException(strBuffer
-                            .append("Register to master failed! ConsumeGroup forbidden, ")
-                            .append(response.getErrMsg()).toString());
-                } else if (response.getErrCode() == TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN) {
-                    logger.error(strBuffer
-                            .append("[Register Failed] Restricted consume content, register to master failed! ")
-                            .append(response.getErrMsg()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                    throw new TubeClientException(strBuffer
-                            .append("Register to master failed! Restricted consume content, ")
-                            .append(response.getErrMsg()).toString());
+                if (response == null) {
+                    logger.warn(strBuffer.append("[Register Failed] ")
+                            .append("response return null!").toString());
                 } else {
-                    logger.error(strBuffer.append("[Register Failed] ")
-                            .append(response.getErrMsg()).toString());
-                    strBuffer.delete(0, strBuffer.length());
+                    if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
+                        throw new TubeClientException(strBuffer
+                                .append("Register to master failed! ConsumeGroup forbidden, ")
+                                .append(response.getErrMsg()).toString());
+                    } else if (response.getErrCode()
+                            == TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN) {
+                        throw new TubeClientException(strBuffer
+                                .append("Register to master failed! Restricted consume content, ")
+                                .append(response.getErrMsg()).toString());
+                    } else {
+                        logger.warn(strBuffer.append("[Register Failed] ")
+                                .append(response.getErrMsg()).toString());
+                    }
                 }
+                strBuffer.delete(0, strBuffer.length());
             } catch (Throwable e) {
-                logger.error("completeSubscribe throwable is ", e);
-                logger.error("Register to master failed.", e);
+                logger.warn("Register to master failed.", e);
                 ThreadUtils.sleep(this.consumerConfig.getRegFailWaitPeriodMs());
             }
             if (++registerRetryTimes >= consumerConfig.getMaxRegisterRetryTimes()) {
@@ -785,40 +782,47 @@ public class BaseMessageConsumer implements MessageConsumer {
                             continue;
                         }
                         ClientBroker.RegisterResponseB2C responseB2C =
-                                getBrokerService(partition.getBroker())
-                                        .consumerRegisterC2B(createBrokerRegisterRequest(partition),
-                                                AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
-                        long currOffset =
+                            getBrokerService(partition.getBroker())
+                                .consumerRegisterC2B(createBrokerRegisterRequest(partition),
+                                    AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+                        if (responseB2C != null && responseB2C.getSuccess()) {
+                            long currOffset =
                                 responseB2C.hasCurrOffset() ? responseB2C.getCurrOffset() : -1L;
-                        if (responseB2C.getSuccess()) {
                             rmtDataCache.addPartition(partition, currOffset);
                             unRegPartitions.remove(partition);
                             logger.info(strBuffer.append("Registered partition: consumer is ")
-                                    .append(consumerId).append(", partition is:")
-                                    .append(partition.toString()).toString());
+                                .append(consumerId).append(", partition is:")
+                                .append(partition.toString()).toString());
                             strBuffer.delete(0, strBuffer.length());
                         } else {
-                            if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
+                            if (responseB2C == null) {
+                                logger.warn(strBuffer.append("register2broker error! ")
+                                    .append(retryTimesRegister2Broker).append(" register ")
+                                    .append(partition.toString()).append(" return null!")
+                                    .toString());
+                            } else {
+                                if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
                                     || responseB2C.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
-                                unRegPartitions.remove(partition);
-                                if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED) {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug(strBuffer
+                                    unRegPartitions.remove(partition);
+                                    if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED) {
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug(strBuffer
                                                 .append("[Partition occupied], curr consumerId: ")
                                                 .append(consumerId).append(", returned message : ")
                                                 .append(responseB2C.getErrMsg()).toString());
-                                    }
-                                } else {
-                                    logger.warn(strBuffer
+                                        }
+                                    } else {
+                                        logger.warn(strBuffer
                                             .append("[Certificate failure], curr consumerId: ")
                                             .append(consumerId).append(", returned message : ")
                                             .append(responseB2C.getErrMsg()).toString());
+                                    }
+                                } else {
+                                    logger.warn(strBuffer.append("register2broker error! ")
+                                        .append(retryTimesRegister2Broker).append(" register ")
+                                        .append(partition.toString()).append(" return ")
+                                        .append(responseB2C.getErrMsg()).toString());
                                 }
-                            } else {
-                                logger.warn(strBuffer.append("register2broker error! ")
-                                    .append(retryTimesRegister2Broker).append(" register ")
-                                    .append(partition.toString()).append(" return ")
-                                    .append(responseB2C.getErrMsg()).toString());
                             }
                             strBuffer.delete(0, strBuffer.length());
                         }
@@ -1448,29 +1452,36 @@ public class BaseMessageConsumer implements MessageConsumer {
                                 createMasterHeartbeatRequest(event, subInfoList, reportSubscribeInfo),
                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                 // Process unsuccessful response
+                if (response == null) {
+                    logger.error(strBuffer.append("[Heartbeat Failed] ")
+                            .append("return result is null!").toString());
+                    heartbeatRetryTimes++;
+                    return;
+                }
                 if (!response.getSuccess()) {
                     // If master replies that cannot find current consumer node, re-register
                     if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
                         try {
                             ClientMaster.RegisterResponseM2C regResponse =
-                                    masterService.consumerRegisterC2M(createMasterRegisterRequest(),
-                                            AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+                                masterService.consumerRegisterC2M(createMasterRegisterRequest(),
+                                    AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                             // Print the log when registration fails
                             if (regResponse == null || !regResponse.getSuccess()) {
                                 if (regResponse == null) {
-                                    logger.error(strBuffer.append("[Re-Register Failed] ").append(consumerId)
-                                            .append(" register to master return null!").toString());
+                                    logger.error(strBuffer.append("[Re-Register Failed] ")
+                                        .append(consumerId)
+                                        .append(" register to master return null!").toString());
                                 } else {
                                     // If the consumer group is forbidden, output the log
-                                    if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
+                                    if (response.getErrCode()
+                                            == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
                                         logger.error(strBuffer.append("[Re-Register Failed] ")
-                                                .append(consumerId)
-                                                .append(" ConsumeGroup forbidden, ")
-                                                .append(response.getErrMsg()).toString());
+                                            .append(consumerId).append(" ConsumeGroup forbidden, ")
+                                            .append(response.getErrMsg()).toString());
                                     } else {
                                         logger.error(strBuffer.append("[Re-Register Failed] ")
-                                                .append(consumerId).append(" ")
-                                                .append(response.getErrMsg()).toString());
+                                            .append(consumerId).append(" ")
+                                            .append(response.getErrMsg()).toString());
                                     }
                                 }
                                 strBuffer.delete(0, strBuffer.length());
@@ -1596,7 +1607,8 @@ public class BaseMessageConsumer implements MessageConsumer {
                                                 createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
                                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                                 // When response is success
-                                if (heartBeatResponseV2.getSuccess()) {
+                                if (heartBeatResponseV2 != null
+                                    && heartBeatResponseV2.getSuccess()) {
                                     // If the peer require authentication, set a flag.
                                     // The following request will attach the auth information.
                                     if (heartBeatResponseV2.hasRequireAuth()) {
@@ -1653,16 +1665,19 @@ public class BaseMessageConsumer implements MessageConsumer {
                                         }
                                     }
                                 }
-                                if (heartBeatResponseV2.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
-                                    for (Partition partition : partitions) {
-                                        removePartition(partition);
-                                    }
-                                    logger.warn(strBuffer
+                                if (heartBeatResponseV2 != null) {
+                                    if (heartBeatResponseV2.getErrCode()
+                                        == TErrCodeConstants.CERTIFICATE_FAILURE) {
+                                        for (Partition partition : partitions) {
+                                            removePartition(partition);
+                                        }
+                                        logger.warn(strBuffer
                                             .append("[heart2broker error] certificate failure, ")
                                             .append(brokerInfo.getBrokerStrInfo())
                                             .append("'s partitions area released, ")
                                             .append(heartBeatResponseV2.getErrMsg()).toString());
-                                    strBuffer.delete(0, strBuffer.length());
+                                        strBuffer.delete(0, strBuffer.length());
+                                    }
                                 }
                             }
                         } catch (Throwable ee) {