You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/28 06:17:48 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-306]Raise
Nullpointer Exception when create tubemq instance (#227)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new b951173 [TUBEMQ-306]Raise Nullpointer Exception when create tubemq instance (#227)
b951173 is described below
commit b951173d2408a39136cbda70346675db0b17d3b6
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jul 28 06:17:41 2020 +0000
[TUBEMQ-306]Raise Nullpointer Exception when create tubemq instance (#227)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../client/consumer/BaseMessageConsumer.java | 133 ++++++++++++---------
1 file changed, 74 insertions(+), 59 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 8267feb..362d389 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -563,35 +563,32 @@ public class BaseMessageConsumer implements MessageConsumer {
ClientMaster.RegisterResponseM2C response =
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
- if (response.getSuccess()) {
+ if (response != null && response.getSuccess()) {
processRegisterAllocAndRspFlowRules(response);
processRegAuthorizedToken(response);
break;
}
- if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
- logger.error(strBuffer
- .append("[Register Failed] ConsumeGroup forbidden, register to master failed! ")
- .append(response.getErrMsg()).toString());
- strBuffer.delete(0, strBuffer.length());
- throw new TubeClientException(strBuffer
- .append("Register to master failed! ConsumeGroup forbidden, ")
- .append(response.getErrMsg()).toString());
- } else if (response.getErrCode() == TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN) {
- logger.error(strBuffer
- .append("[Register Failed] Restricted consume content, register to master failed! ")
- .append(response.getErrMsg()).toString());
- strBuffer.delete(0, strBuffer.length());
- throw new TubeClientException(strBuffer
- .append("Register to master failed! Restricted consume content, ")
- .append(response.getErrMsg()).toString());
+ if (response == null) {
+ logger.warn(strBuffer.append("[Register Failed] ")
+ .append("response return null!").toString());
} else {
- logger.error(strBuffer.append("[Register Failed] ")
- .append(response.getErrMsg()).toString());
- strBuffer.delete(0, strBuffer.length());
+ if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
+ throw new TubeClientException(strBuffer
+ .append("Register to master failed! ConsumeGroup forbidden, ")
+ .append(response.getErrMsg()).toString());
+ } else if (response.getErrCode()
+ == TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN) {
+ throw new TubeClientException(strBuffer
+ .append("Register to master failed! Restricted consume content, ")
+ .append(response.getErrMsg()).toString());
+ } else {
+ logger.warn(strBuffer.append("[Register Failed] ")
+ .append(response.getErrMsg()).toString());
+ }
}
+ strBuffer.delete(0, strBuffer.length());
} catch (Throwable e) {
- logger.error("completeSubscribe throwable is ", e);
- logger.error("Register to master failed.", e);
+ logger.warn("Register to master failed.", e);
ThreadUtils.sleep(this.consumerConfig.getRegFailWaitPeriodMs());
}
if (++registerRetryTimes >= consumerConfig.getMaxRegisterRetryTimes()) {
@@ -785,40 +782,47 @@ public class BaseMessageConsumer implements MessageConsumer {
continue;
}
ClientBroker.RegisterResponseB2C responseB2C =
- getBrokerService(partition.getBroker())
- .consumerRegisterC2B(createBrokerRegisterRequest(partition),
- AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
- long currOffset =
+ getBrokerService(partition.getBroker())
+ .consumerRegisterC2B(createBrokerRegisterRequest(partition),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (responseB2C != null && responseB2C.getSuccess()) {
+ long currOffset =
responseB2C.hasCurrOffset() ? responseB2C.getCurrOffset() : -1L;
- if (responseB2C.getSuccess()) {
rmtDataCache.addPartition(partition, currOffset);
unRegPartitions.remove(partition);
logger.info(strBuffer.append("Registered partition: consumer is ")
- .append(consumerId).append(", partition is:")
- .append(partition.toString()).toString());
+ .append(consumerId).append(", partition is:")
+ .append(partition.toString()).toString());
strBuffer.delete(0, strBuffer.length());
} else {
- if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
+ if (responseB2C == null) {
+ logger.warn(strBuffer.append("register2broker error! ")
+ .append(retryTimesRegister2Broker).append(" register ")
+ .append(partition.toString()).append(" return null!")
+ .toString());
+ } else {
+ if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
|| responseB2C.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
- unRegPartitions.remove(partition);
- if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED) {
- if (logger.isDebugEnabled()) {
- logger.debug(strBuffer
+ unRegPartitions.remove(partition);
+ if (responseB2C.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(strBuffer
.append("[Partition occupied], curr consumerId: ")
.append(consumerId).append(", returned message : ")
.append(responseB2C.getErrMsg()).toString());
- }
- } else {
- logger.warn(strBuffer
+ }
+ } else {
+ logger.warn(strBuffer
.append("[Certificate failure], curr consumerId: ")
.append(consumerId).append(", returned message : ")
.append(responseB2C.getErrMsg()).toString());
+ }
+ } else {
+ logger.warn(strBuffer.append("register2broker error! ")
+ .append(retryTimesRegister2Broker).append(" register ")
+ .append(partition.toString()).append(" return ")
+ .append(responseB2C.getErrMsg()).toString());
}
- } else {
- logger.warn(strBuffer.append("register2broker error! ")
- .append(retryTimesRegister2Broker).append(" register ")
- .append(partition.toString()).append(" return ")
- .append(responseB2C.getErrMsg()).toString());
}
strBuffer.delete(0, strBuffer.length());
}
@@ -1448,29 +1452,36 @@ public class BaseMessageConsumer implements MessageConsumer {
createMasterHeartbeatRequest(event, subInfoList, reportSubscribeInfo),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Process unsuccessful response
+ if (response == null) {
+ logger.error(strBuffer.append("[Heartbeat Failed] ")
+ .append("return result is null!").toString());
+ heartbeatRetryTimes++;
+ return;
+ }
if (!response.getSuccess()) {
// If master replies that cannot find current consumer node, re-register
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
try {
ClientMaster.RegisterResponseM2C regResponse =
- masterService.consumerRegisterC2M(createMasterRegisterRequest(),
- AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ masterService.consumerRegisterC2M(createMasterRegisterRequest(),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Print the log when registration fails
if (regResponse == null || !regResponse.getSuccess()) {
if (regResponse == null) {
- logger.error(strBuffer.append("[Re-Register Failed] ").append(consumerId)
- .append(" register to master return null!").toString());
+ logger.error(strBuffer.append("[Re-Register Failed] ")
+ .append(consumerId)
+ .append(" register to master return null!").toString());
} else {
// If the consumer group is forbidden, output the log
- if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
+ if (response.getErrCode()
+ == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
logger.error(strBuffer.append("[Re-Register Failed] ")
- .append(consumerId)
- .append(" ConsumeGroup forbidden, ")
- .append(response.getErrMsg()).toString());
+ .append(consumerId).append(" ConsumeGroup forbidden, ")
+ .append(response.getErrMsg()).toString());
} else {
logger.error(strBuffer.append("[Re-Register Failed] ")
- .append(consumerId).append(" ")
- .append(response.getErrMsg()).toString());
+ .append(consumerId).append(" ")
+ .append(response.getErrMsg()).toString());
}
}
strBuffer.delete(0, strBuffer.length());
@@ -1596,7 +1607,8 @@ public class BaseMessageConsumer implements MessageConsumer {
createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// When response is success
- if (heartBeatResponseV2.getSuccess()) {
+ if (heartBeatResponseV2 != null
+ && heartBeatResponseV2.getSuccess()) {
// If the peer require authentication, set a flag.
// The following request will attach the auth information.
if (heartBeatResponseV2.hasRequireAuth()) {
@@ -1653,16 +1665,19 @@ public class BaseMessageConsumer implements MessageConsumer {
}
}
}
- if (heartBeatResponseV2.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
- for (Partition partition : partitions) {
- removePartition(partition);
- }
- logger.warn(strBuffer
+ if (heartBeatResponseV2 != null) {
+ if (heartBeatResponseV2.getErrCode()
+ == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ for (Partition partition : partitions) {
+ removePartition(partition);
+ }
+ logger.warn(strBuffer
.append("[heart2broker error] certificate failure, ")
.append(brokerInfo.getBrokerStrInfo())
.append("'s partitions area released, ")
.append(heartBeatResponseV2.getErrMsg()).toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuffer.delete(0, strBuffer.length());
+ }
}
}
} catch (Throwable ee) {