You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/09 11:28:45 UTC

[incubator-inlong] branch master updated: [INLONG-1775]Define the PB messages used in client partition-assigned (#1779)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fb56ebd  [INLONG-1775]Define the PB messages used in client partition-assigned (#1779)
fb56ebd is described below

commit fb56ebdaa3a52d2a57da76773f13a56a41fcf7e4
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Nov 9 19:28:40 2021 +0800

    [INLONG-1775]Define the PB messages used in client partition-assigned (#1779)
---
 .../tubemq/client/consumer/ConsumerResult.java     | 46 ++++++-------
 .../inlong/tubemq/corebase/rv/ProcessResult.java   |  4 --
 .../apache/inlong/tubemq/corerpc/RpcConstants.java |  4 ++
 .../inlong/tubemq/corerpc/codec/PbEnDecoder.java   | 27 ++++++++
 .../tubemq/corerpc/service/MasterService.java      | 12 ++++
 .../tubemq-core/src/main/proto/MasterService.proto | 77 ++++++++++++++++++++++
 .../inlong/tubemq/server/master/TMaster.java       | 27 ++++++++
 .../impl/bdbimpl/BdbBrokerConfigMapperImpl.java    |  2 +-
 .../impl/bdbimpl/BdbClusterConfigMapperImpl.java   |  2 +-
 .../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java     |  2 +-
 .../impl/bdbimpl/BdbGroupResCtrlMapperImpl.java    |  2 +-
 .../impl/bdbimpl/BdbTopicCtrlMapperImpl.java       |  2 +-
 .../impl/bdbimpl/BdbTopicDeployMapperImpl.java     |  2 +-
 13 files changed, 176 insertions(+), 33 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
index f224573..94cb860 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
@@ -21,32 +21,31 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.inlong.tubemq.client.common.PeerInfo;
 import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
 
-public class ConsumerResult {
-    private boolean success = false;
-    private int errCode = TBaseConstants.META_VALUE_UNDEFINED;
-    private String errMsg = "";
+public class ConsumerResult extends RetValue {
     private String topicName = "";
     private PeerInfo peerInfo = new PeerInfo();
     private String confirmContext = "";
     private List<Message> messageList = new ArrayList<>();
 
+    public ConsumerResult() {
+        super();
+    }
+
     public ConsumerResult(int errCode, String errMsg) {
-        this.success = false;
-        this.errCode = errCode;
-        this.errMsg = errMsg;
+        super(errCode, errMsg);
     }
 
     public ConsumerResult(FetchContext taskContext) {
-        this.success = taskContext.isSuccess();
-        this.errCode = taskContext.getErrCode();
-        this.errMsg = taskContext.getErrMsg();
+        super(taskContext.isSuccess(),
+                taskContext.getErrCode(),
+                taskContext.getErrMsg());
         this.topicName = taskContext.getPartition().getTopic();
         peerInfo.setMsgSourceInfo(taskContext.getPartition(),
                 taskContext.getCurrOffset(), taskContext.getMaxOffset());
-        if (this.success) {
+        if (this.isSuccess()) {
             this.messageList = taskContext.getMessageList();
             this.confirmContext = taskContext.getConfirmContext();
         }
@@ -55,19 +54,24 @@ public class ConsumerResult {
     public ConsumerResult(boolean isSuccess, int errCode, String errMsg,
                           String topicName, Partition partition,
                           long currOffset, long maxOffset) {
-        this.success = isSuccess;
-        this.errCode = errCode;
-        this.errMsg = errMsg;
+        super(isSuccess, errCode, errMsg);
         this.topicName = topicName;
         this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
     }
 
-    public boolean isSuccess() {
-        return success;
+    public void setSuccResult(String topicName, Partition partition,
+                              long currOffset, long maxOffset) {
+        super.setSuccResult();
+        this.topicName = topicName;
+        this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
     }
 
-    public int getErrCode() {
-        return errCode;
+    public void setProcessResult(boolean isSuccess, int errCode, String errMsg,
+                                 String topicName, Partition partition,
+                                 long currOffset, long maxOffset) {
+        super.setFullInfo(isSuccess, errCode, errMsg);
+        this.topicName = topicName;
+        this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
     }
 
     public String getTopicName() {
@@ -82,10 +86,6 @@ public class ConsumerResult {
         return peerInfo.getPartitionKey();
     }
 
-    public String getErrMsg() {
-        return errMsg;
-    }
-
     public final String getConfirmContext() {
         return confirmContext;
     }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
index 35c7f04..57e44c5 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
@@ -63,10 +63,6 @@ public class ProcessResult extends RetValue {
         return retData1;
     }
 
-    public void setRetData(Object retData) {
-        this.retData1 = retData;
-    }
-
     public void clear() {
         super.clear();
         this.retData1 = null;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
index 6e24011..80e451d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
@@ -102,6 +102,10 @@ public final class RpcConstants {
     public static final int RPC_MSG_BROKER_CONSUMER_CLOSE = 19;
     //public static final int RPC_MSG_BROKER_METHOD_END = 19;
 
+    public static final int RPC_MSG_MASTER_CONSUMER_REGISTER_V2 = 20;
+    public static final int RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2 = 21;
+    public static final int RPC_MSG_MASTER_CONSUMER_GET_PART_META = 22;
+
     public static final int MSG_OPTYPE_REGISTER = 31;
     public static final int MSG_OPTYPE_UNREGISTER = 32;
 
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
index 46a1576..036c9d5 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
@@ -51,6 +51,12 @@ public class PbEnDecoder {
         rpcMethodMap.put("getMessagesC2B", RpcConstants.RPC_MSG_BROKER_CONSUMER_GETMESSAGE);
         rpcMethodMap.put("consumerCommitC2B", RpcConstants.RPC_MSG_BROKER_CONSUMER_COMMIT);
         rpcMethodMap.put("sendMessageP2B", RpcConstants.RPC_MSG_BROKER_PRODUCER_SENDMESSAGE);
+        rpcMethodMap.put("consumerRegisterC2MV2",
+                RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2);
+        rpcMethodMap.put("consumerHeartbeatC2MV2",
+                RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2);
+        rpcMethodMap.put("consumerGetPartMetaInfoC2M",
+                RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META);
 
         rpcServiceMap.put("org.apache.inlong.tubemq.corerpc.service.MasterService",
                 RpcConstants.RPC_SERVICE_TYPE_MASTER_SERVICE);
@@ -106,6 +112,15 @@ public class PbEnDecoder {
                 case RpcConstants.RPC_MSG_MASTER_BROKER_HEARTBEAT: {
                     return ClientMaster.HeartRequestB2M.parseFrom(bytes);
                 }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2: {
+                    return ClientMaster.RegisterRequestC2MV2.parseFrom(bytes);
+                }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2: {
+                    return ClientMaster.HeartRequestC2MV2.parseFrom(bytes);
+                }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META: {
+                    return ClientMaster.GetPartMetaRequestC2M.parseFrom(bytes);
+                }
                 case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
                     return ClientMaster.CloseRequestB2M.parseFrom(bytes);
                 }
@@ -163,6 +178,15 @@ public class PbEnDecoder {
                 case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
                     return ClientMaster.CloseResponseM2B.parseFrom(bytes);
                 }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2: {
+                    return ClientMaster.RegisterResponseM2CV2.parseFrom(bytes);
+                }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2: {
+                    return ClientMaster.HeartResponseM2CV2.parseFrom(bytes);
+                }
+                case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META: {
+                    return ClientMaster.GetPartMetaResponseM2C.parseFrom(bytes);
+                }
                 case RpcConstants.RPC_MSG_BROKER_PRODUCER_SENDMESSAGE: {
                     return ClientBroker.SendMessageResponseB2P.parseFrom(bytes);
                 }
@@ -234,6 +258,9 @@ public class PbEnDecoder {
                     case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER:
                     case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT:
                     case RpcConstants.RPC_MSG_MASTER_CONSUMER_CLOSECLIENT:
+                    case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2:
+                    case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2:
+                    case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META:
                     case RpcConstants.RPC_MSG_MASTER_BROKER_REGISTER:
                     case RpcConstants.RPC_MSG_MASTER_BROKER_HEARTBEAT:
                     case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
index dded84b..5d4de62 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
@@ -48,4 +48,16 @@ public interface MasterService {
     ClientMaster.CloseResponseM2B brokerCloseClientB2M(ClientMaster.CloseRequestB2M request,
                                                        String rmtAddress, boolean overtls) throws Throwable;
 
+    ClientMaster.RegisterResponseM2CV2 consumerRegisterC2MV2(
+            ClientMaster.RegisterRequestC2MV2 request,
+            String rmtAddress, boolean overtls) throws Throwable;
+
+    ClientMaster.HeartResponseM2CV2 consumerHeartbeatC2MV2(
+            ClientMaster.HeartRequestC2MV2 request,
+            String rmtAddress, boolean overtls) throws Throwable;
+
+    ClientMaster.GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(
+            ClientMaster.GetPartMetaRequestC2M request,
+            String rmtAddress, boolean overtls) throws Throwable;
+
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto b/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
index 6597fca..b8baafc 100644
--- a/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
+++ b/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
@@ -430,3 +430,80 @@ message CloseResponse_V2_M2B {
     required int32 errCode = 1;
     optional string errMsg = 2;
 }
+
+message ClientSubRepInfo {
+    required int64 brokerConfigId = 1;
+    required int64 topicMetaInfoId = 2;
+    optional int64 lstAssignedTime = 3;
+    optional bool reportSubInfo = 4;
+    repeated string partSubInfo = 5;
+}
+
+message OpsTaskInfo {
+    optional int64 groupFlowCheckId = 1;
+    optional int64 defFlowCheckId = 2;
+    optional int32 qryPriorityId = 3;
+    optional int64 csmFrmMaxOffsetCtrlId = 4;
+    optional bool requireAuth = 5;
+    optional string defFlowControlInfo = 6;
+    optional string groupFlowControlInfo = 7;
+}
+
+message RegisterRequestC2MV2 {
+    required string clientId = 1;
+    required string groupName = 2;
+    required string hostName = 3;
+    required int32 sourceCount = 4;
+    required int32 nodeId = 5;
+    repeated string topicList = 6;
+    repeated string topicCondition = 7;
+    optional ClientSubRepInfo subRepInfo = 8;
+    optional OpsTaskInfo opsTaskInfo = 9;
+    optional MasterCertificateInfo authInfo = 10;
+    optional string jdkVersion = 11;
+}
+
+message RegisterResponseM2CV2 {
+    required int32 errCode = 1;
+    required string errMsg = 2;
+    optional int64 brokerConfigId = 3;
+    repeated string brokerConfigList = 4;
+    optional OpsTaskInfo opsTaskInfo = 5;
+    optional MasterAuthorizedInfo authorizedInfo = 6;
+}
+
+message HeartRequestC2MV2 {
+    required string clientId = 1;
+    required string groupName = 2;
+    optional ClientSubRepInfo subRepInfo = 3;
+    optional OpsTaskInfo opsTaskInfo = 4;
+    optional MasterCertificateInfo authInfo = 5;
+}
+
+message HeartResponseM2CV2 {
+    required int32 errCode = 1;
+    required string errMsg = 2;
+    optional int64 brokerConfigId = 3;
+    repeated string brokerConfigList = 4;
+    optional int64  topicMetaInfoId = 5;
+    repeated string topicMetaInfoList = 6;
+    optional OpsTaskInfo opsTaskInfo = 7;
+    optional MasterAuthorizedInfo authorizedInfo = 8;
+}
+
+message GetPartMetaRequestC2M {
+    required string clientId = 1;
+    required string groupName = 2;
+    required int64  brokerConfigId = 3;
+    required int64  topicMetaInfoId = 4;
+    optional MasterCertificateInfo authInfo = 5;
+}
+
+message GetPartMetaResponseM2C {
+    required int32 errCode = 1;
+    required string errMsg = 2;
+    optional int64 brokerConfigId = 3;
+    optional int64  topicMetaInfoId = 4;
+    repeated string topicMetaInfoList = 5;
+    repeated string brokerConfigList = 6;
+}
\ No newline at end of file
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 08bef9d..c40a6f1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -56,19 +56,25 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseRe
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseResponseM2P;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.EnableBrokerFunInfo;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.EventProto;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.GetPartMetaRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.GetPartMetaResponseM2C;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestB2M;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestC2MV2;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestP2M;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2C;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2CV2;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2P;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.MasterAuthorizedInfo;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.MasterBrokerAuthorizedInfo;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestB2M;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestC2MV2;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestP2M;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2C;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2CV2;
 import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
@@ -1185,6 +1191,27 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return builder.build();
     }
 
+    @Override
+    public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
+                                                       String rmtAddress,
+                                                       boolean overtls) throws Throwable {
+        return null;
+    }
+
+    @Override
+    public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request,
+                                                     String rmtAddress,
+                                                     boolean overtls) throws Throwable {
+        return null;
+    }
+
+    @Override
+    public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request,
+                                                             String rmtAddress,
+                                                             boolean overtls) throws Throwable {
+        return null;
+    }
+
     /**
      * Generate consumer id
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 571aaa3..9eb5d49 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -152,7 +152,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
         }
         if (putBrokerConfig2Bdb(memEntity, result)) {
             addOrUpdCacheRecord(memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
index 84b72c8..ff4266b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -136,7 +136,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
         }
         if (putClusterConfig2Bdb(memEntity, result)) {
             metaDataCache.put(memEntity.getRecordKey(), memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 8c2410c..cef8f3f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -147,7 +147,7 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
         }
         if (putGroupConsumeCtrlConfig2Bdb(memEntity, result)) {
             addOrUpdCacheRecord(memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
index e59b67e..913b046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
@@ -136,7 +136,7 @@ public class BdbGroupResCtrlMapperImpl implements GroupResCtrlMapper {
         }
         if (putGroupConfigConfig2Bdb(memEntity, result)) {
             groupBaseCtrlCache.put(memEntity.getGroupName(), memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index 47c8394..a0601b1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -140,7 +140,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
         }
         if (putTopicCtrlConfig2Bdb(memEntity, result)) {
             topicCtrlCache.put(memEntity.getTopicName(), memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index c65d7d9..94da730 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -147,7 +147,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
         }
         if (putTopicConfig2Bdb(memEntity, result)) {
             addOrUpdCacheRecord(memEntity);
-            result.setRetData(curEntity);
+            result.setSuccResult(curEntity);
         }
         return result.isSuccess();
     }