You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 04:05:20 UTC
[inlong] 01/05: [INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 7b7fc132f83e881509adba16e5e6fc0f893ef683
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Nov 8 11:00:43 2022 +0800
[INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450)
---
.../inlong/tubemq/server/broker/offset/OffsetRecordService.java | 2 +-
.../main/java/org/apache/inlong/tubemq/server/master/TMaster.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 2f55d4f14..16901911f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -79,7 +79,7 @@ public class OffsetRecordService extends AbstractDaemonService {
// check topic writable status
TopicMetadata topicMetadata = storeManager.getMetadataManager()
.getTopicMetadata(TServerConstants.OFFSET_HISTORY_NAME);
- if (!topicMetadata.isAcceptPublish()) {
+ if (topicMetadata == null || !topicMetadata.isAcceptPublish()) {
return;
}
// get group offset information
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index e581a5105..6a5b76b90 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -575,6 +575,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
+ checkNodeStatus(consumerId, strBuffer);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), result, strBuffer)) {
builder.setErrCode(result.getErrCode());
@@ -626,7 +627,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
ConsumerInfo inConsumerInfo2 = (ConsumerInfo) paramCheckResult.checkData;
- checkNodeStatus(consumerId, strBuffer);
CertifiedResult authorizeResult =
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
groupName, reqTopicSet, reqTopicConditions, rmtAddress);
@@ -1258,6 +1258,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
+ // check master current status
+ checkNodeStatus(consumerId, sBuffer);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), result, sBuffer)) {
builder.setErrCode(result.getErrCode());
@@ -1283,8 +1285,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (request.hasOpsTaskInfo()) {
opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
}
- // check master current status
- checkNodeStatus(consumerId, sBuffer);
ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
if (request.hasSubRepInfo()) {
clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());