You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/09 11:28:45 UTC
[incubator-inlong] branch master updated: [INLONG-1775]Define the
PB messages used in client partition-assigned (#1779)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fb56ebd [INLONG-1775]Define the PB messages used in client partition-assigned (#1779)
fb56ebd is described below
commit fb56ebdaa3a52d2a57da76773f13a56a41fcf7e4
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Nov 9 19:28:40 2021 +0800
[INLONG-1775]Define the PB messages used in client partition-assigned (#1779)
---
.../tubemq/client/consumer/ConsumerResult.java | 46 ++++++-------
.../inlong/tubemq/corebase/rv/ProcessResult.java | 4 --
.../apache/inlong/tubemq/corerpc/RpcConstants.java | 4 ++
.../inlong/tubemq/corerpc/codec/PbEnDecoder.java | 27 ++++++++
.../tubemq/corerpc/service/MasterService.java | 12 ++++
.../tubemq-core/src/main/proto/MasterService.proto | 77 ++++++++++++++++++++++
.../inlong/tubemq/server/master/TMaster.java | 27 ++++++++
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 2 +-
.../impl/bdbimpl/BdbClusterConfigMapperImpl.java | 2 +-
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 2 +-
.../impl/bdbimpl/BdbGroupResCtrlMapperImpl.java | 2 +-
.../impl/bdbimpl/BdbTopicCtrlMapperImpl.java | 2 +-
.../impl/bdbimpl/BdbTopicDeployMapperImpl.java | 2 +-
13 files changed, 176 insertions(+), 33 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
index f224573..94cb860 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ConsumerResult.java
@@ -21,32 +21,31 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.inlong.tubemq.client.common.PeerInfo;
import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
-public class ConsumerResult {
- private boolean success = false;
- private int errCode = TBaseConstants.META_VALUE_UNDEFINED;
- private String errMsg = "";
+public class ConsumerResult extends RetValue {
private String topicName = "";
private PeerInfo peerInfo = new PeerInfo();
private String confirmContext = "";
private List<Message> messageList = new ArrayList<>();
+ public ConsumerResult() {
+ super();
+ }
+
public ConsumerResult(int errCode, String errMsg) {
- this.success = false;
- this.errCode = errCode;
- this.errMsg = errMsg;
+ super(errCode, errMsg);
}
public ConsumerResult(FetchContext taskContext) {
- this.success = taskContext.isSuccess();
- this.errCode = taskContext.getErrCode();
- this.errMsg = taskContext.getErrMsg();
+ super(taskContext.isSuccess(),
+ taskContext.getErrCode(),
+ taskContext.getErrMsg());
this.topicName = taskContext.getPartition().getTopic();
peerInfo.setMsgSourceInfo(taskContext.getPartition(),
taskContext.getCurrOffset(), taskContext.getMaxOffset());
- if (this.success) {
+ if (this.isSuccess()) {
this.messageList = taskContext.getMessageList();
this.confirmContext = taskContext.getConfirmContext();
}
@@ -55,19 +54,24 @@ public class ConsumerResult {
public ConsumerResult(boolean isSuccess, int errCode, String errMsg,
String topicName, Partition partition,
long currOffset, long maxOffset) {
- this.success = isSuccess;
- this.errCode = errCode;
- this.errMsg = errMsg;
+ super(isSuccess, errCode, errMsg);
this.topicName = topicName;
this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
}
- public boolean isSuccess() {
- return success;
+ public void setSuccResult(String topicName, Partition partition,
+ long currOffset, long maxOffset) {
+ super.setSuccResult();
+ this.topicName = topicName;
+ this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
}
- public int getErrCode() {
- return errCode;
+ public void setProcessResult(boolean isSuccess, int errCode, String errMsg,
+ String topicName, Partition partition,
+ long currOffset, long maxOffset) {
+ super.setFullInfo(isSuccess, errCode, errMsg);
+ this.topicName = topicName;
+ this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
}
public String getTopicName() {
@@ -82,10 +86,6 @@ public class ConsumerResult {
return peerInfo.getPartitionKey();
}
- public String getErrMsg() {
- return errMsg;
- }
-
public final String getConfirmContext() {
return confirmContext;
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
index 35c7f04..57e44c5 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/rv/ProcessResult.java
@@ -63,10 +63,6 @@ public class ProcessResult extends RetValue {
return retData1;
}
- public void setRetData(Object retData) {
- this.retData1 = retData;
- }
-
public void clear() {
super.clear();
this.retData1 = null;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
index 6e24011..80e451d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
@@ -102,6 +102,10 @@ public final class RpcConstants {
public static final int RPC_MSG_BROKER_CONSUMER_CLOSE = 19;
//public static final int RPC_MSG_BROKER_METHOD_END = 19;
+ public static final int RPC_MSG_MASTER_CONSUMER_REGISTER_V2 = 20;
+ public static final int RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2 = 21;
+ public static final int RPC_MSG_MASTER_CONSUMER_GET_PART_META = 22;
+
public static final int MSG_OPTYPE_REGISTER = 31;
public static final int MSG_OPTYPE_UNREGISTER = 32;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
index 46a1576..036c9d5 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
@@ -51,6 +51,12 @@ public class PbEnDecoder {
rpcMethodMap.put("getMessagesC2B", RpcConstants.RPC_MSG_BROKER_CONSUMER_GETMESSAGE);
rpcMethodMap.put("consumerCommitC2B", RpcConstants.RPC_MSG_BROKER_CONSUMER_COMMIT);
rpcMethodMap.put("sendMessageP2B", RpcConstants.RPC_MSG_BROKER_PRODUCER_SENDMESSAGE);
+ rpcMethodMap.put("consumerRegisterC2MV2",
+ RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2);
+ rpcMethodMap.put("consumerHeartbeatC2MV2",
+ RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2);
+ rpcMethodMap.put("consumerGetPartMetaInfoC2M",
+ RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META);
rpcServiceMap.put("org.apache.inlong.tubemq.corerpc.service.MasterService",
RpcConstants.RPC_SERVICE_TYPE_MASTER_SERVICE);
@@ -106,6 +112,15 @@ public class PbEnDecoder {
case RpcConstants.RPC_MSG_MASTER_BROKER_HEARTBEAT: {
return ClientMaster.HeartRequestB2M.parseFrom(bytes);
}
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2: {
+ return ClientMaster.RegisterRequestC2MV2.parseFrom(bytes);
+ }
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2: {
+ return ClientMaster.HeartRequestC2MV2.parseFrom(bytes);
+ }
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META: {
+ return ClientMaster.GetPartMetaRequestC2M.parseFrom(bytes);
+ }
case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
return ClientMaster.CloseRequestB2M.parseFrom(bytes);
}
@@ -163,6 +178,15 @@ public class PbEnDecoder {
case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
return ClientMaster.CloseResponseM2B.parseFrom(bytes);
}
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2: {
+ return ClientMaster.RegisterResponseM2CV2.parseFrom(bytes);
+ }
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2: {
+ return ClientMaster.HeartResponseM2CV2.parseFrom(bytes);
+ }
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META: {
+ return ClientMaster.GetPartMetaResponseM2C.parseFrom(bytes);
+ }
case RpcConstants.RPC_MSG_BROKER_PRODUCER_SENDMESSAGE: {
return ClientBroker.SendMessageResponseB2P.parseFrom(bytes);
}
@@ -234,6 +258,9 @@ public class PbEnDecoder {
case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER:
case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT:
case RpcConstants.RPC_MSG_MASTER_CONSUMER_CLOSECLIENT:
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_REGISTER_V2:
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_HEARTBEAT_V2:
+ case RpcConstants.RPC_MSG_MASTER_CONSUMER_GET_PART_META:
case RpcConstants.RPC_MSG_MASTER_BROKER_REGISTER:
case RpcConstants.RPC_MSG_MASTER_BROKER_HEARTBEAT:
case RpcConstants.RPC_MSG_MASTER_BROKER_CLOSECLIENT: {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
index dded84b..5d4de62 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/service/MasterService.java
@@ -48,4 +48,16 @@ public interface MasterService {
ClientMaster.CloseResponseM2B brokerCloseClientB2M(ClientMaster.CloseRequestB2M request,
String rmtAddress, boolean overtls) throws Throwable;
+ ClientMaster.RegisterResponseM2CV2 consumerRegisterC2MV2(
+ ClientMaster.RegisterRequestC2MV2 request,
+ String rmtAddress, boolean overtls) throws Throwable;
+
+ ClientMaster.HeartResponseM2CV2 consumerHeartbeatC2MV2(
+ ClientMaster.HeartRequestC2MV2 request,
+ String rmtAddress, boolean overtls) throws Throwable;
+
+ ClientMaster.GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(
+ ClientMaster.GetPartMetaRequestC2M request,
+ String rmtAddress, boolean overtls) throws Throwable;
+
}
diff --git a/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto b/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
index 6597fca..b8baafc 100644
--- a/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
+++ b/inlong-tubemq/tubemq-core/src/main/proto/MasterService.proto
@@ -430,3 +430,80 @@ message CloseResponse_V2_M2B {
required int32 errCode = 1;
optional string errMsg = 2;
}
+
+message ClientSubRepInfo {
+ required int64 brokerConfigId = 1;
+ required int64 topicMetaInfoId = 2;
+ optional int64 lstAssignedTime = 3;
+ optional bool reportSubInfo = 4;
+ repeated string partSubInfo = 5;
+}
+
+message OpsTaskInfo {
+ optional int64 groupFlowCheckId = 1;
+ optional int64 defFlowCheckId = 2;
+ optional int32 qryPriorityId = 3;
+ optional int64 csmFrmMaxOffsetCtrlId = 4;
+ optional bool requireAuth = 5;
+ optional string defFlowControlInfo = 6;
+ optional string groupFlowControlInfo = 7;
+}
+
+message RegisterRequestC2MV2 {
+ required string clientId = 1;
+ required string groupName = 2;
+ required string hostName = 3;
+ required int32 sourceCount = 4;
+ required int32 nodeId = 5;
+ repeated string topicList = 6;
+ repeated string topicCondition = 7;
+ optional ClientSubRepInfo subRepInfo = 8;
+ optional OpsTaskInfo opsTaskInfo = 9;
+ optional MasterCertificateInfo authInfo = 10;
+ optional string jdkVersion = 11;
+}
+
+message RegisterResponseM2CV2 {
+ required int32 errCode = 1;
+ required string errMsg = 2;
+ optional int64 brokerConfigId = 3;
+ repeated string brokerConfigList = 4;
+ optional OpsTaskInfo opsTaskInfo = 5;
+ optional MasterAuthorizedInfo authorizedInfo = 6;
+}
+
+message HeartRequestC2MV2 {
+ required string clientId = 1;
+ required string groupName = 2;
+ optional ClientSubRepInfo subRepInfo = 3;
+ optional OpsTaskInfo opsTaskInfo = 4;
+ optional MasterCertificateInfo authInfo = 5;
+}
+
+message HeartResponseM2CV2 {
+ required int32 errCode = 1;
+ required string errMsg = 2;
+ optional int64 brokerConfigId = 3;
+ repeated string brokerConfigList = 4;
+ optional int64 topicMetaInfoId = 5;
+ repeated string topicMetaInfoList = 6;
+ optional OpsTaskInfo opsTaskInfo = 7;
+ optional MasterAuthorizedInfo authorizedInfo = 8;
+}
+
+message GetPartMetaRequestC2M {
+ required string clientId = 1;
+ required string groupName = 2;
+ required int64 brokerConfigId = 3;
+ required int64 topicMetaInfoId = 4;
+ optional MasterCertificateInfo authInfo = 5;
+}
+
+message GetPartMetaResponseM2C {
+ required int32 errCode = 1;
+ required string errMsg = 2;
+ optional int64 brokerConfigId = 3;
+ optional int64 topicMetaInfoId = 4;
+ repeated string topicMetaInfoList = 5;
+ repeated string brokerConfigList = 6;
+}
\ No newline at end of file
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 08bef9d..c40a6f1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -56,19 +56,25 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseRe
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseResponseM2P;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.EnableBrokerFunInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.EventProto;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.GetPartMetaRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.GetPartMetaResponseM2C;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestB2M;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestC2MV2;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartRequestP2M;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2C;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2CV2;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2P;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.MasterAuthorizedInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.MasterBrokerAuthorizedInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestB2M;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestC2M;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestC2MV2;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterRequestP2M;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2C;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2CV2;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
@@ -1185,6 +1191,27 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
+ @Override
+ public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
+ String rmtAddress,
+ boolean overtls) throws Throwable {
+ return null;
+ }
+
+ @Override
+ public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request,
+ String rmtAddress,
+ boolean overtls) throws Throwable {
+ return null;
+ }
+
+ @Override
+ public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request,
+ String rmtAddress,
+ boolean overtls) throws Throwable {
+ return null;
+ }
+
/**
* Generate consumer id
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 571aaa3..9eb5d49 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -152,7 +152,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
if (putBrokerConfig2Bdb(memEntity, result)) {
addOrUpdCacheRecord(memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
index 84b72c8..ff4266b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbClusterConfigMapperImpl.java
@@ -136,7 +136,7 @@ public class BdbClusterConfigMapperImpl implements ClusterConfigMapper {
}
if (putClusterConfig2Bdb(memEntity, result)) {
metaDataCache.put(memEntity.getRecordKey(), memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 8c2410c..cef8f3f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -147,7 +147,7 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
if (putGroupConsumeCtrlConfig2Bdb(memEntity, result)) {
addOrUpdCacheRecord(memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
index e59b67e..913b046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
@@ -136,7 +136,7 @@ public class BdbGroupResCtrlMapperImpl implements GroupResCtrlMapper {
}
if (putGroupConfigConfig2Bdb(memEntity, result)) {
groupBaseCtrlCache.put(memEntity.getGroupName(), memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index 47c8394..a0601b1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -140,7 +140,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
}
if (putTopicCtrlConfig2Bdb(memEntity, result)) {
topicCtrlCache.put(memEntity.getTopicName(), memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index c65d7d9..94da730 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -147,7 +147,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
if (putTopicConfig2Bdb(memEntity, result)) {
addOrUpdCacheRecord(memEntity);
- result.setRetData(curEntity);
+ result.setSuccResult(curEntity);
}
return result.isSuccess();
}