You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/13 13:30:51 UTC
[incubator-inlong] branch master updated: [INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab01685 [INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)
ab01685 is described below
commit ab01685bbaa72d7ebbb3ca7e8439cb9b2c7efd42
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Mar 13 21:30:38 2022 +0800
[INLONG-3105][TubeMQ] Add MetaStoreMapper related implementation (#3106)
---
.../server/common/utils/WebParameterUtils.java | 12 +-
.../server/master/metamanage/MetaDataManager.java | 12 +-
.../metamanage/metastore/MetaStoreService.java | 17 +-
.../metastore/dao/mapper/BrokerConfigMapper.java | 2 +-
.../metastore/dao/mapper/ClusterConfigMapper.java | 23 +-
.../metastore/dao/mapper/ConsumeCtrlMapper.java | 4 +-
.../metastore/dao/mapper/MetaStoreMapper.java | 570 +++++++++
.../metastore/dao/mapper/TopicDeployMapper.java | 27 +-
.../metastore/impl/AbsBrokerConfigMapperImpl.java | 52 +-
.../metastore/impl/AbsClusterConfigMapperImpl.java | 79 +-
.../metastore/impl/AbsConsumeCtrlMapperImpl.java | 78 +-
.../metastore/impl/AbsGroupResCtrlMapperImpl.java | 4 +-
.../metastore/impl/AbsMetaStoreMapperImpl.java | 1214 ++++++++++++++++++++
.../metastore/impl/AbsTopicCtrlMapperImpl.java | 4 +-
.../metastore/impl/AbsTopicDeployMapperImpl.java | 193 ++--
.../impl/bdbimpl/BdbMetaStoreServiceImpl.java | 115 +-
16 files changed, 2077 insertions(+), 329 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index 0bb3c71..4bd8b2c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -378,11 +378,11 @@ public class WebParameterUtils {
* @param brokerWebPort broker web port
* @param strBuff string buffer
* @param result check result of parameter value
- * @return true for illegal, false for legal
+ * @return true for valid, false for invalid
*/
- public static boolean isConflictedPortsSet(int brokerPort, int brokerTlsPort,
- int brokerWebPort, StringBuilder strBuff,
- ProcessResult result) {
+ public static boolean isValidPortsSet(int brokerPort, int brokerTlsPort,
+ int brokerWebPort, StringBuilder strBuff,
+ ProcessResult result) {
if (brokerPort == brokerWebPort || brokerTlsPort == brokerWebPort) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
strBuff.append(DataOpErrCode.DERR_CONFLICT_VALUE.getDescription())
@@ -392,9 +392,9 @@ public class WebParameterUtils {
.append(" cannot be the same as the value of")
.append(WebFieldDef.BROKERWEBPORT.name).toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
- return false;
+ return true;
}
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 6da5457..f35104e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -425,7 +425,7 @@ public class MetaDataManager implements Server {
if (isAddOp) {
if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) == null
&& metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) == null) {
- if (!WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
+ if (WebParameterUtils.isValidPortsSet(entity.getBrokerPort(),
entity.getBrokerTLSPort(), entity.getBrokerWebPort(), sBuffer, result)) {
if (metaStoreService.addBrokerConf(entity, sBuffer, result)) {
this.tMaster.getBrokerRunManager().updBrokerStaticInfo(entity);
@@ -453,7 +453,7 @@ public class MetaDataManager implements Server {
entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
entity.getRegionId(), entity.getGroupId(),
entity.getManageStatus(), entity.getTopicProps())) {
- if (!WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+ if (WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
sBuffer, result)) {
if (metaStoreService.updBrokerConf(newEntity, sBuffer, result)) {
@@ -1580,9 +1580,9 @@ public class MetaDataManager implements Server {
newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
- if (!WebParameterUtils.isConflictedPortsSet(newConf.getBrokerPort(),
+ if (WebParameterUtils.isValidPortsSet(newConf.getBrokerPort(),
newConf.getBrokerTLSPort(), newConf.getBrokerWebPort(), sBuffer, result)) {
- metaStoreService.addClusterConfig(newConf, sBuffer, result);
+ metaStoreService.addUpdClusterConfig(newConf, sBuffer, result);
}
} else {
newConf = curConf.clone();
@@ -1590,10 +1590,10 @@ public class MetaDataManager implements Server {
if (newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
- if (!WebParameterUtils.isConflictedPortsSet(newConf.getBrokerPort(),
+ if (WebParameterUtils.isValidPortsSet(newConf.getBrokerPort(),
newConf.getBrokerTLSPort(), newConf.getBrokerWebPort(),
sBuffer, result)) {
- metaStoreService.updClusterConfig(newConf, sBuffer, result);
+ metaStoreService.addUpdClusterConfig(newConf, sBuffer, result);
}
} else {
result.setSuccResult(null);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 3daeb15..fb7821b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -38,26 +38,15 @@ public interface MetaStoreService extends KeepAlive, Server {
// cluster default configure api
/**
- * Add or update cluster default setting
+ * Add cluster default setting, or update records if data exists
*
* @param entity the cluster default setting entity will be add
* @param strBuff the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- boolean addClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result);
-
- /**
- * Update cluster default setting
- *
- * @param entity the cluster default setting entity will be add
- * @param strBuff the print info string buffer
- * @param result the process result return
- * @return true if success otherwise false
- */
- boolean updClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ boolean addUpdClusterConfig(ClusterSettingEntity entity,
+ StringBuilder strBuff, ProcessResult result);
ClusterSettingEntity getClusterConfig();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
index 2a9ff53..dba9932 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
@@ -59,7 +59,7 @@ public interface BrokerConfigMapper extends AbstractMapper {
* @return the process result
*/
boolean updBrokerMngStatus(BaseEntity opEntity,
- Integer brokerId, ManageStatus newMngStatus,
+ int brokerId, ManageStatus newMngStatus,
StringBuilder strBuff, ProcessResult result);
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
index 559a523..e173565 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ClusterConfigMapper.java
@@ -23,26 +23,24 @@ import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.Cl
public interface ClusterConfigMapper extends AbstractMapper {
/**
- * Add a new cluster setting info into store
+ * Add or replace cluster setting info into store
*
* @param entity need add record
* @param strBuff the string buffer
* @param result process result with old value
* @return the process result
*/
- boolean addClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ boolean addUpdClusterConfig(ClusterSettingEntity entity,
+ StringBuilder strBuff, ProcessResult result);
/**
- * Update cluster setting info in store
+ * delete current cluster setting from store
*
- * @param entity need add record
+ * @param result the process result
* @param strBuff the string buffer
- * @param result process result with old value
* @return the process result
*/
- boolean updClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ boolean delClusterConfig(StringBuilder strBuff, ProcessResult result);
/**
* get current cluster setting from store
@@ -50,13 +48,4 @@ public interface ClusterConfigMapper extends AbstractMapper {
* @return current cluster setting, null or object, only read
*/
ClusterSettingEntity getClusterConfig();
-
- /**
- * delete current cluster setting from store
- *
- * @param result the process result
- * @param strBuff the string buffer
- * @return the process result
- */
- boolean delClusterConfig(StringBuilder strBuff, ProcessResult result);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
index 6aa6606..971e088 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/ConsumeCtrlMapper.java
@@ -36,9 +36,9 @@ public interface ConsumeCtrlMapper extends AbstractMapper {
boolean delGroupConsumeCtrlConf(String groupName, String topicName,
StringBuilder strBuff, ProcessResult result);
- boolean isTopicNameInUsed(String topicName);
+ boolean isTopicNameInUse(String topicName);
- boolean hasGroupConsumeCtrlConf(String groupName);
+ boolean isGroupNameInUse(String groupName);
GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java
new file mode 100644
index 0000000..1470b7f
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaStoreMapper.java
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+
+public interface MetaStoreMapper {
+
+ /**
+ * Add or update cluster default setting
+ * @param opEntity operator information
+ * @param brokerPort broker port
+ * @param brokerTlsPort broker tls port
+ * @param brokerWebPort broker web port
+ * @param maxMsgSizeMB max cluster message size in MB
+ * @param qryPriorityId the default query priority id
+ * @param flowCtrlEnable enable or disable flow control function
+ * @param flowRuleCnt the default flow rule count
+ * @param flowCtrlInfo the default flow control information
+ * @param topicProps default topic property information
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdClusterDefSetting(BaseEntity opEntity, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int maxMsgSizeMB, int qryPriorityId,
+ Boolean flowCtrlEnable, int flowRuleCnt,
+ String flowCtrlInfo, TopicPropGroup topicProps,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Get cluster configure information
+ *
+ * @param isMustConf whether must be configured data.
+ * @return the cluster configure
+ */
+ ClusterSettingEntity getClusterDefSetting(boolean isMustConf);
+
+ // ////////////////////////////////////////////////////////////
+ /**
+ * Add or update broker configure information
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity operator information
+ * @param brokerId broker id
+ * @param brokerIp broker ip
+ * @param brokerPort broker port
+ * @param brokerTlsPort broker tls port
+ * @param brokerWebPort broker web port
+ * @param regionId region id
+ * @param groupId group id
+ * @param mngStatus manage status
+ * @param topicProps default topic property information
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opEntity,
+ int brokerId, String brokerIp, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int regionId, int groupId, ManageStatus mngStatus,
+ TopicPropGroup topicProps, StringBuilder strBuff,
+ ProcessResult result);
+
+ /**
+ * Add or update broker configure information
+ *
+ * @param isAddOp whether add operation
+ * @param entity need add or update configure information
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Change broker configure status
+ *
+ * @param opEntity operator
+ * @param brokerId need deleted broker id
+ * @param newMngStatus manage status
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean changeBrokerConfStatus(BaseEntity opEntity,
+ int brokerId, ManageStatus newMngStatus,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Delete broker configure information
+ *
+ * @param operator operator
+ * @param brokerId need deleted broker id
+ * @param rsvData reserve broker data
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean delBrokerConfInfo(String operator, int brokerId, boolean rsvData,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Query broker configure information
+ *
+ * @param qryEntity the query condition, must not null
+ * @return the query result
+ */
+ Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity);
+
+ /**
+ * Get broker configure information
+ *
+ * @param brokerIdSet the broker id set need to query
+ * @param brokerIpSet the broker ip set need to query
+ * @param qryEntity the query condition
+ * @return broker configure information
+ */
+ Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity);
+
+ /**
+ * Get broker configure information
+ *
+ * @param brokerId need queried broker id
+ * @return the broker configure record
+ */
+ BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
+
+ /**
+ * Get broker configure information
+ *
+ * @param brokerIp need queried broker ip
+ * @return the broker configure record
+ */
+ BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
+
+ // ////////////////////////////////////////////////////////////
+
+ /**
+ * Add or Update topic control configure info
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity operator information
+ * @param topicName topic name
+ * @param topicNameId the topic name id
+ * @param enableTopicAuth whether enable topic authentication
+ * @param maxMsgSizeInMB the max message size in MB
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String topicName, int topicNameId,
+ Boolean enableTopicAuth, int maxMsgSizeInMB,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add or Update topic control configure info
+ *
+ * @param isAddOp whether add operation
+ * @param entity the topic control info entity will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add topic control record, or update records if data exists
+ *
+ * @param opEntity operator information
+ * @param topicName topic info
+ * @param enableTopicAuth if authenticate check
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean insertTopicCtrlConf(BaseEntity opEntity,
+ String topicName, Boolean enableTopicAuth,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add topic control record, or update records if data exists
+ *
+ * @param entity operator information
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean insertTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Delete topic control configure
+ *
+ * @param operator operator
+ * @param topicName the topicName will be deleted
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean delTopicCtrlConf(String operator, String topicName,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Get topic control record by topic name
+ *
+ * @param topicName the topic name
+ * @return the topic control record
+ */
+ TopicCtrlEntity getTopicCtrlByTopicName(String topicName);
+
+ /**
+ * Get topic max message size in MB configure
+ *
+ * @param topicName the topic name
+ * @return the max message size
+ */
+ int getTopicMaxMsgSizeInMB(String topicName);
+
+ /**
+ * Get topic control entity list
+ *
+ * @param qryEntity the query condition
+ * @return the query result list
+ */
+ List<TopicCtrlEntity> queryTopicCtrlConf(TopicCtrlEntity qryEntity);
+
+ /**
+ * Get topic control entity list
+ *
+ * @param topicNameSet the topic name set
+ * @param qryEntity the query condition
+ * @return the query result list
+ */
+ Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity);
+
+ // ////////////////////////////////////////////////////////////
+
+ /**
+ * Add or update topic deploy information
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity the operation information
+ * @param brokerId the broker id
+ * @param topicName the topic name
+ * @param deployStatus the deploy status
+ * @param topicPropInfo the topic property set
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
+ int brokerId, String topicName,
+ TopicStatus deployStatus,
+ TopicPropGroup topicPropInfo,
+ StringBuilder strBuff,
+ ProcessResult result);
+
+ /**
+ * Add or update topic deploy configure info
+ *
+ * @param isAddOp whether add operation
+ * @param deployEntity the topic deploy info entity
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdTopicDeployInfo(boolean isAddOp, TopicDeployEntity deployEntity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Update topic deploy status info
+ *
+ * @param opEntity the operation information
+ * @param brokerId the broker id
+ * @param topicName the topic name
+ * @param topicStatus the topic deploy status
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean updTopicDeployStatusInfo(BaseEntity opEntity, int brokerId,
+ String topicName, TopicStatus topicStatus,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Get broker topic entity, if query entity is null, return all topic entity
+ *
+ * @param topicNameSet query by topicNameSet
+ * @param brokerIdSet query by brokerIdSet
+ * @param qryEntity query conditions
+ * @return topic deploy entity map
+ */
+ Map<String, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet,
+ TopicDeployEntity qryEntity);
+
+ /**
+ * Get broker topic entity, if query entity is null, return all topic entity
+ *
+ * @param topicNameSet the query topic set
+ * @param brokerIdSet the query broker id set
+ * @return topic deploy entity map
+ */
+ Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet);
+
+ /**
+ * Get broker topic entity by topic name and broker id set
+ *
+ * @param topicNameSet the query topic set
+ * @param brokerIdSet the query broker id set
+ * @return topic entity map
+ */
+ Map<String, List<TopicDeployEntity>> getTopicConfInfoByTopicAndBrokerIds(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet);
+
+ // ////////////////////////////////////////////////////////////
+
+ /**
+ * Add or update group resource configure information
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity operator information
+ * @param groupName the group name
+ * @param resCheckEnable whether check resource rate
+ * @param allowedBClientRate the allowed broker-client rate
+ * @param qryPriorityId the query priority id
+ * @param flowCtrlEnable enable or disable flow control
+ * @param flowRuleCnt the flow control rule count
+ * @param flowCtrlInfo the flow control information
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String groupName, Boolean resCheckEnable,
+ int allowedBClientRate, int qryPriorityId,
+ Boolean flowCtrlEnable, int flowRuleCnt,
+ String flowCtrlInfo, StringBuilder strBuff,
+ ProcessResult result);
+
+ /**
+ * Add group resource control configure info
+ *
+ * @param entity the group resource control info entity will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdGroupResCtrlConf(boolean isAddOp, GroupResCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add group control configure, or update records if data exists
+ *
+ * @param opEntity operator information
+ * @param groupName the group name
+ * @param qryPriorityId the query priority id
+ * @param flowCtrlEnable enable or disable flow control
+ * @param flowRuleCnt the flow control rule count
+ * @param flowCtrlRuleInfo the flow control information
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlRuleInfo,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add group control configure, or update records if data exists
+ *
+ * @param opEntity the group resource control info entity will be add
+ * @param groupName operate target
+ * @param resChkEnable resource check status
+ * @param allowedB2CRate allowed B2C rate
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+ Boolean resChkEnable, int allowedB2CRate,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add group control configure, or update records if data exists
+ *
+ * @param entity the group resource control info entity will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean insertGroupCtrlConf(GroupResCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Delete group control information
+ *
+ * @param operator operator
+ * @param groupName need deleted group
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return delete result
+ */
+ boolean delGroupCtrlConf(String operator, String groupName,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Get group control information by group and query condition
+ *
+ * @param groupSet need queried group set
+ * @param qryEntity query condition
+ * @return query result
+ */
+ Map<String, GroupResCtrlEntity> getGroupCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity);
+
+ /**
+ * Get group control information by group name
+ *
+ * @param groupName need queried group name
+ * @return query result
+ */
+ GroupResCtrlEntity getGroupCtrlConf(String groupName);
+
+ // //////////////////////////////////////////////////////////////////
+
+ /**
+ * Add or update group resource configure information
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity operator information
+ * @param groupName the group name
+ * @param topicName the topic name
+ * @param enableCsm enable or disable consume
+ * @param disableRsn the disable reason
+ * @param enableFlt enable or disable filter
+ * @param fltCondStr the filter conditions
+ * @param strBuff the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, BaseEntity opEntity,
+ String groupName, String topicName,
+ Boolean enableCsm, String disableRsn,
+ Boolean enableFlt, String fltCondStr,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * add or update group's consume control information
+ *
+ * @param isAddOp whether add operation
+ * @param entity need add or update group configure info
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, GroupConsumeCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Add consume control information, or update records if data exists
+ *
+ * @param opEntity add or update base information, include creator, create time, etc.
+ * @param groupName add or update groupName information
+ * @param topicName add or update topicName information
+ * @param enableCsm add or update consume enable status information
+ * @param disReason add or update disable consume reason
+ * @param enableFlt add or update filter enable status information
+ * @param fltCondStr add or update filter configure information
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return process result
+ */
+ boolean insertConsumeCtrlInfo(BaseEntity opEntity, String groupName,
+ String topicName, Boolean enableCsm,
+ String disReason, Boolean enableFlt,
+ String fltCondStr, StringBuilder strBuff,
+ ProcessResult result);
+
+ /**
+ * Add consume control information, or update records if data exists
+ *
+ * @param entity add or update group consume control info
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return process result
+ */
+ boolean insertConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Delete consume control configure
+ *
+ * @param operator the operator
+ * @param groupName the group name
+ * @param topicName the topic name
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return process result
+ */
+ boolean delConsumeCtrlConf(String operator,
+ String groupName, String topicName,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Get all group consume control record for a specific topic
+ *
+ * @param topicName the queried topic name
+ * @return group consume control list
+ */
+ List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName);
+
+ /**
+ * Get all disable consumed topic for a specific group
+ *
+ * @param groupName the queried group name
+ * @return the disable consumed topic list
+ */
+ Set<String> getDisableTopicByGroupName(String groupName);
+
+ /**
+ * Get group consume control configure for topic & group set
+ *
+ * @param groupSet the topic name set
+ * @param topicSet the group name set
+ * @param qryEntry the query conditions
+ * @return the queried consume control record
+ */
+ Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry);
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index d8d8e28..05cb02b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
public interface TopicDeployMapper extends AbstractMapper {
@@ -33,8 +35,8 @@ public interface TopicDeployMapper extends AbstractMapper {
* @param result the process result
* @return whether success
*/
- boolean addTopicConf(TopicDeployEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ boolean addTopicDeployConf(TopicDeployEntity entity,
+ StringBuilder strBuff, ProcessResult result);
/**
* Update the topic deploy configure info from store
@@ -44,8 +46,23 @@ public interface TopicDeployMapper extends AbstractMapper {
* @param result the process result
* @return whether success
*/
- boolean updTopicConf(TopicDeployEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ boolean updTopicDeployConf(TopicDeployEntity entity,
+ StringBuilder strBuff, ProcessResult result);
+
+ /**
+ * Update topic deploy status info from store
+ *
+ * @param opEntity the operator info
+ * @param brokerId the broker id
+ * @param topicName the topic name
+ * @param topicStatus the new topic status
+ * @param strBuff the string buffer
+ * @param result the process result
+ * @return whether success
+ */
+ boolean updTopicDeployStatus(BaseEntity opEntity, int brokerId,
+ String topicName, TopicStatus topicStatus,
+ StringBuilder strBuff, ProcessResult result);
/**
* delete topic deploy configure info from store
@@ -55,7 +72,7 @@ public interface TopicDeployMapper extends AbstractMapper {
* @param result the process result
* @return whether success
*/
- boolean delTopicConf(String recordKey, StringBuilder strBuff, ProcessResult result);
+ boolean delTopicDeployConf(String recordKey, StringBuilder strBuff, ProcessResult result);
/**
* delete topic deploy configure info from store
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
index 9ee35c3..c466667 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsBrokerConfigMapperImpl.java
@@ -22,6 +22,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
@@ -66,7 +68,7 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
return result.isSuccess();
}
// Check whether the configured ports conflict in the record
- if (WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
+ if (!WebParameterUtils.isValidPortsSet(entity.getBrokerPort(),
entity.getBrokerTLSPort(), entity.getBrokerWebPort(), strBuff, result)) {
return result.isSuccess();
}
@@ -104,26 +106,26 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
return result.isSuccess();
}
// Check whether the configured ports conflict in the record
- if (WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+ if (!WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
strBuff, result)) {
return result.isSuccess();
}
// Check manage status
- if (isIllegalManageStatusChange(newEntity, curEntity, strBuff, result)) {
+ if (!isValidMngStatusChange(newEntity, curEntity, strBuff, result)) {
return result.isSuccess();
}
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
putRecord2Caches(newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@Override
public boolean updBrokerMngStatus(BaseEntity opEntity,
- Integer brokerId, ManageStatus newMngStatus,
+ int brokerId, ManageStatus newMngStatus,
StringBuilder strBuff, ProcessResult result) {
// Check the existence of records by brokerId
BrokerConfEntity curEntity = brokerConfCache.get(brokerId);
@@ -139,22 +141,21 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
BrokerConfEntity newEntity = curEntity.clone();
newEntity.updBaseModifyInfo(opEntity);
if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
- curEntity.getBrokerPort(), curEntity.getBrokerTLSPort(),
- curEntity.getBrokerWebPort(), curEntity.getRegionId(),
- curEntity.getGroupId(), newMngStatus,
- curEntity.getTopicProps())) {
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
"Broker configure not changed!");
return result.isSuccess();
}
// Check manage status
- if (isIllegalManageStatusChange(newEntity, curEntity, strBuff, result)) {
+ if (!isValidMngStatusChange(newEntity, curEntity, strBuff, result)) {
return result.isSuccess();
}
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
putRecord2Caches(newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@@ -168,20 +169,11 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
result.setSuccResult(null);
return result.isSuccess();
}
- // Check broker's manage status
- if (curEntity.getManageStatus().isOnlineStatus()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- strBuff.append("Illegal manage status, please offline the broker(")
- .append(WebFieldDef.BROKERID.name).append("=")
- .append(curEntity.getBrokerId()).append(") first!").toString());
- strBuff.delete(0, strBuff.length());
- return result.isSuccess();
- }
// Delete record from persistent
delConfigFromPersistent(brokerId, strBuff);
// Clear cache data
delRecordFromCaches(brokerId);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return result.isSuccess();
}
@@ -384,20 +376,20 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
}
/**
- * Check whether the management status change is illegal
+ * Check whether the management status change is legal
*
* @param newEntity the entity to be updated
* @param curEntity the current entity
* @param strBuff string buffer
* @param result check result of parameter value
- * @return true for illegal, false for legal
+ * @return true for valid, false for invalid
*/
- private boolean isIllegalManageStatusChange(BrokerConfEntity newEntity,
- BrokerConfEntity curEntity,
- StringBuilder strBuff,
- ProcessResult result) {
+ private boolean isValidMngStatusChange(BrokerConfEntity newEntity,
+ BrokerConfEntity curEntity,
+ StringBuilder strBuff,
+ ProcessResult result) {
if (newEntity.getManageStatus() == curEntity.getManageStatus()) {
- return false;
+ return true;
}
if (((newEntity.getManageStatus().getCode() < ManageStatus.STATUS_MANAGE_ONLINE.getCode())
&& (curEntity.getManageStatus().getCode() >= ManageStatus.STATUS_MANAGE_ONLINE.getCode()))
@@ -411,8 +403,8 @@ public abstract class AbsBrokerConfigMapperImpl implements BrokerConfigMapper {
.append(" for the broker(").append(WebFieldDef.BROKERID.name).append("=")
.append(curEntity.getBrokerId()).append(")!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
- return false;
+ return true;
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
index 827c44a..16c3411 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsClusterConfigMapperImpl.java
@@ -39,74 +39,42 @@ public abstract class AbsClusterConfigMapperImpl implements ClusterConfigMapper
}
@Override
- public boolean addClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result) {
+ public boolean addUpdClusterConfig(ClusterSettingEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ ClusterSettingEntity newEntity;
// Check whether the configure record already exist
ClusterSettingEntity curEntity = metaDataCache.get(entity.getRecordKey());
if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- strBuff.append("Existed record found for ")
- .append(entity.getRecordKey()).toString());
- strBuff.delete(0, strBuff.length());
- return result.isSuccess();
+ newEntity = entity.clone();
+ } else {
+ // Build the entity that need to be updated
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (!newEntity.updModifyInfo(entity.getDataVerId(),
+ entity.getBrokerPort(), entity.getBrokerTLSPort(),
+ entity.getBrokerWebPort(), entity.getMaxMsgSizeInMB(),
+ entity.getQryPriorityId(), entity.enableFlowCtrl(),
+ entity.getGloFlowCtrlRuleCnt(), entity.getGloFlowCtrlRuleInfo(),
+ entity.getClsDefTopicProps())) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ "Cluster configure not changed!");
+ return result.isSuccess();
+ }
}
// Check whether the configured ports conflict in the record
- if (WebParameterUtils.isConflictedPortsSet(entity.getBrokerPort(),
- entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
- strBuff, result)) {
- return result.isSuccess();
- }
- // Store data to persistent
- if (putConfig2Persistent(entity, strBuff, result)) {
- metaDataCache.put(entity.getRecordKey(), entity);
- }
- return result.isSuccess();
- }
-
- @Override
- public boolean updClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result) {
- // Check for the record to be updated
- ClusterSettingEntity curEntity = metaDataCache.get(entity.getRecordKey());
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- strBuff.append("Not found cluster configure for (")
- .append(entity.getRecordKey()).append(")!").toString());
- return result.isSuccess();
- }
- // Build the entity that need to be updated
- ClusterSettingEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(entity);
- if (!newEntity.updModifyInfo(entity.getDataVerId(),
- entity.getBrokerPort(), entity.getBrokerTLSPort(),
- entity.getBrokerWebPort(), entity.getMaxMsgSizeInMB(),
- entity.getQryPriorityId(), entity.enableFlowCtrl(),
- entity.getGloFlowCtrlRuleCnt(), entity.getGloFlowCtrlRuleInfo(),
- entity.getClsDefTopicProps())) {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- "Cluster configure not changed!");
- return result.isSuccess();
- }
- // Check whether the configured ports conflict in the record
- if (WebParameterUtils.isConflictedPortsSet(newEntity.getBrokerPort(),
+ if (!WebParameterUtils.isValidPortsSet(newEntity.getBrokerPort(),
newEntity.getBrokerTLSPort(), newEntity.getBrokerWebPort(),
strBuff, result)) {
return result.isSuccess();
}
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
- metaDataCache.put(newEntity.getRecordKey(), newEntity);
- result.setSuccResult(curEntity);
+ metaDataCache.put(newEntity.getRecordKey(), entity);
}
return result.isSuccess();
}
@Override
- public ClusterSettingEntity getClusterConfig() {
- return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
- }
-
- @Override
public boolean delClusterConfig(StringBuilder strBuff, ProcessResult result) {
ClusterSettingEntity curEntity =
metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
@@ -116,10 +84,15 @@ public abstract class AbsClusterConfigMapperImpl implements ClusterConfigMapper
}
delConfigFromPersistent(strBuff, TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
metaDataCache.remove(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return true;
}
+ @Override
+ public ClusterSettingEntity getClusterConfig() {
+ return metaDataCache.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ }
+
/**
* Clear cached data
*/
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index f2a03d3..dc8703b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -39,11 +39,11 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
LoggerFactory.getLogger(AbsConsumeCtrlMapperImpl.class);
// configure cache
private final ConcurrentHashMap<String/* recordKey */, GroupConsumeCtrlEntity>
- grpConsumeCtrlCache = new ConcurrentHashMap<>();
+ consumeCtrlCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
- grpConsumeCtrlTopicCache = new ConcurrentHashMap<>();
+ topic2RecordCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String>>
- grpConsumeCtrlGroupCache = new ConcurrentHashMap<>();
+ group2RecordCache = new ConcurrentHashMap<>();
public AbsConsumeCtrlMapperImpl() {
// Initial instant
@@ -53,7 +53,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
public boolean addGroupConsumeCtrlConf(GroupConsumeCtrlEntity entity,
StringBuilder strBuff, ProcessResult result) {
// Checks whether the record already exists
- GroupConsumeCtrlEntity curEntity = grpConsumeCtrlCache.get(entity.getRecordKey());
+ GroupConsumeCtrlEntity curEntity = consumeCtrlCache.get(entity.getRecordKey());
if (curEntity != null) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
strBuff.append("Existed record found for groupName-topicName(")
@@ -72,7 +72,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
public boolean updGroupConsumeCtrlConf(GroupConsumeCtrlEntity entity,
StringBuilder strBuff, ProcessResult result) {
// Checks whether the record already exists
- GroupConsumeCtrlEntity curEntity = grpConsumeCtrlCache.get(entity.getRecordKey());
+ GroupConsumeCtrlEntity curEntity = consumeCtrlCache.get(entity.getRecordKey());
if (curEntity == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
strBuff.append("Not found consume control for through groupName-topicName(")
@@ -93,22 +93,22 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
putRecord2Caches(newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@Override
- public boolean isTopicNameInUsed(String topicName) {
+ public boolean isTopicNameInUse(String topicName) {
ConcurrentHashSet<String> consumeCtrlSet =
- grpConsumeCtrlTopicCache.get(topicName);
+ topic2RecordCache.get(topicName);
return (consumeCtrlSet != null && !consumeCtrlSet.isEmpty());
}
@Override
- public boolean hasGroupConsumeCtrlConf(String groupName) {
+ public boolean isGroupNameInUse(String groupName) {
ConcurrentHashSet<String> keySet =
- grpConsumeCtrlGroupCache.get(groupName);
+ group2RecordCache.get(groupName);
return (keySet != null && !keySet.isEmpty());
}
@@ -117,14 +117,14 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
StringBuilder strBuff,
ProcessResult result) {
GroupConsumeCtrlEntity curEntity =
- grpConsumeCtrlCache.get(recordKey);
+ consumeCtrlCache.get(recordKey);
if (curEntity == null) {
result.setSuccResult(null);
return true;
}
delConfigFromPersistent(recordKey, strBuff);
delRecordFromCaches(recordKey);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return true;
}
@@ -139,11 +139,11 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
result.setSuccResult(null);
return true;
} else {
- keySet = grpConsumeCtrlTopicCache.get(topicName);
+ keySet = topic2RecordCache.get(topicName);
}
} else {
if (topicName == null) {
- keySet = grpConsumeCtrlGroupCache.get(groupName);
+ keySet = group2RecordCache.get(groupName);
} else {
keySet.add(KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName));
}
@@ -164,13 +164,13 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
@Override
public GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey) {
- return grpConsumeCtrlCache.get(recordKey);
+ return consumeCtrlCache.get(recordKey);
}
@Override
public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName) {
ConcurrentHashSet<String> keySet =
- grpConsumeCtrlTopicCache.get(topicName);
+ topic2RecordCache.get(topicName);
if (keySet == null || keySet.isEmpty()) {
return Collections.emptyList();
}
@@ -180,7 +180,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
if (recordKey == null) {
continue;
}
- entity = grpConsumeCtrlCache.get(recordKey);
+ entity = consumeCtrlCache.get(recordKey);
if (entity != null) {
result.add(entity);
}
@@ -191,14 +191,14 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
@Override
public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
ConcurrentHashSet<String> keySet =
- grpConsumeCtrlGroupCache.get(groupName);
+ group2RecordCache.get(groupName);
if (keySet == null || keySet.isEmpty()) {
return Collections.emptyList();
}
GroupConsumeCtrlEntity entity;
List<GroupConsumeCtrlEntity> result = new ArrayList<>();
for (String recordKey : keySet) {
- entity = grpConsumeCtrlCache.get(recordKey);
+ entity = consumeCtrlCache.get(recordKey);
if (entity != null) {
result.add(entity);
}
@@ -209,7 +209,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
@Override
public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(
String groupName, String topicName) {
- return grpConsumeCtrlCache.get(
+ return consumeCtrlCache.get(
KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName));
}
@@ -223,7 +223,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
GroupConsumeCtrlEntity tmpEntity;
List<GroupConsumeCtrlEntity> itemLst;
if (totalMatchedSet == null) {
- for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
+ for (GroupConsumeCtrlEntity entity : consumeCtrlCache.values()) {
if (entity == null || (qryEntry != null && !entity.isMatched(qryEntry))) {
continue;
}
@@ -233,7 +233,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
}
} else {
for (String recKey : totalMatchedSet) {
- tmpEntity = grpConsumeCtrlCache.get(recKey);
+ tmpEntity = consumeCtrlCache.get(recKey);
if (tmpEntity == null || (qryEntry != null && !tmpEntity.isMatched(qryEntry))) {
continue;
}
@@ -249,9 +249,9 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
public List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity qryEntity) {
List<GroupConsumeCtrlEntity> retEntities = new ArrayList<>();
if (qryEntity == null) {
- retEntities.addAll(grpConsumeCtrlCache.values());
+ retEntities.addAll(consumeCtrlCache.values());
} else {
- for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
+ for (GroupConsumeCtrlEntity entity : consumeCtrlCache.values()) {
if (entity != null && entity.isMatched(qryEntity)) {
retEntities.add(entity);
}
@@ -270,7 +270,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
if (groupSet != null && !groupSet.isEmpty()) {
groupKeySet = new HashSet<>();
for (String group : groupSet) {
- recSet = grpConsumeCtrlGroupCache.get(group);
+ recSet = group2RecordCache.get(group);
if (recSet != null && !recSet.isEmpty()) {
groupKeySet.addAll(recSet);
}
@@ -283,7 +283,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
if (topicSet != null && !topicSet.isEmpty()) {
topicKeySet = new HashSet<>();
for (String topic : topicSet) {
- recSet = grpConsumeCtrlTopicCache.get(topic);
+ recSet = topic2RecordCache.get(topic);
if (recSet != null && !recSet.isEmpty()) {
topicKeySet.addAll(recSet);
}
@@ -316,9 +316,9 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
* Clear cached data
*/
protected void clearCachedData() {
- grpConsumeCtrlTopicCache.clear();
- grpConsumeCtrlGroupCache.clear();
- grpConsumeCtrlCache.clear();
+ topic2RecordCache.clear();
+ group2RecordCache.clear();
+ consumeCtrlCache.clear();
}
/**
@@ -327,23 +327,23 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
* @param entity need added or updated entity
*/
protected void putRecord2Caches(GroupConsumeCtrlEntity entity) {
- grpConsumeCtrlCache.put(entity.getRecordKey(), entity);
+ consumeCtrlCache.put(entity.getRecordKey(), entity);
// add topic index map
ConcurrentHashSet<String> keySet =
- grpConsumeCtrlTopicCache.get(entity.getTopicName());
+ topic2RecordCache.get(entity.getTopicName());
if (keySet == null) {
ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = grpConsumeCtrlTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
+ keySet = topic2RecordCache.putIfAbsent(entity.getTopicName(), tmpSet);
if (keySet == null) {
keySet = tmpSet;
}
}
keySet.add(entity.getRecordKey());
// add group index map
- keySet = grpConsumeCtrlGroupCache.get(entity.getGroupName());
+ keySet = group2RecordCache.get(entity.getGroupName());
if (keySet == null) {
ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = grpConsumeCtrlGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
+ keySet = group2RecordCache.putIfAbsent(entity.getGroupName(), tmpSet);
if (keySet == null) {
keySet = tmpSet;
}
@@ -378,25 +378,25 @@ public abstract class AbsConsumeCtrlMapperImpl implements ConsumeCtrlMapper {
*/
private void delRecordFromCaches(String recordKey) {
GroupConsumeCtrlEntity curEntity =
- grpConsumeCtrlCache.remove(recordKey);
+ consumeCtrlCache.remove(recordKey);
if (curEntity == null) {
return;
}
// add topic index
ConcurrentHashSet<String> keySet =
- grpConsumeCtrlTopicCache.get(curEntity.getTopicName());
+ topic2RecordCache.get(curEntity.getTopicName());
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- grpConsumeCtrlTopicCache.remove(curEntity.getTopicName());
+ topic2RecordCache.remove(curEntity.getTopicName());
}
}
// delete group index
- keySet = grpConsumeCtrlGroupCache.get(curEntity.getGroupName());
+ keySet = group2RecordCache.get(curEntity.getGroupName());
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- grpConsumeCtrlGroupCache.remove(curEntity.getGroupName());
+ group2RecordCache.remove(curEntity.getGroupName());
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
index 25fbe43..dc3edea 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsGroupResCtrlMapperImpl.java
@@ -83,7 +83,7 @@ public abstract class AbsGroupResCtrlMapperImpl implements GroupResCtrlMapper {
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
groupBaseCtrlCache.put(newEntity.getGroupName(), newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@@ -98,7 +98,7 @@ public abstract class AbsGroupResCtrlMapperImpl implements GroupResCtrlMapper {
}
delConfigFromPersistent(groupName, strBuff);
groupBaseCtrlCache.remove(groupName);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return true;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java
new file mode 100644
index 0000000..933cd86
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaStoreMapperImpl.java
@@ -0,0 +1,1214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.metamanage.metastore.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.inlong.tubemq.server.common.utils.RowLock;
+import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.MetaStoreMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.ClusterConfigMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.ConsumeCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupResCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicCtrlMapper;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AbsMetaStoreMapperImpl implements MetaStoreMapper {
+ protected static final Logger logger =
+ LoggerFactory.getLogger(AbsMetaStoreMapperImpl.class);
+ // service status
+ // 0 stopped, 1 starting, 2 started, 3 stopping
+ private final AtomicInteger srvStatus = new AtomicInteger(0);
+ // the observers focusing on active-standby switching
+ private final List<AliveObserver> eventObservers = new ArrayList<>();
+
+ // row lock.
+ private final RowLock metaRowLock;
+ // default cluster setting
+ private static final ClusterSettingEntity defClusterSetting =
+ new ClusterSettingEntity().fillDefaultValue();
+ // cluster default setting
+ private ClusterConfigMapper clusterConfigMapper;
+ // broker configure
+ private BrokerConfigMapper brokerConfigMapper;
+ // topic deployment configure
+ private TopicDeployMapper topicDeployMapper;
+ // topic control configure
+ private TopicCtrlMapper topicCtrlMapper;
+ // group resource control configure
+ private GroupResCtrlMapper groupResCtrlMapper;
+ // group consume control configure
+ private ConsumeCtrlMapper consumeCtrlMapper;
+
+ public AbsMetaStoreMapperImpl(int rowLockWaiDurMs) {
+ this.metaRowLock =
+ new RowLock("MetaData-RowLock", rowLockWaiDurMs);
+ }
+
+ @Override
+ public boolean addOrUpdClusterDefSetting(BaseEntity opEntity,
+ int brokerPort, int brokerTlsPort,
+ int brokerWebPort, int maxMsgSizeMB,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlInfo,
+ TopicPropGroup topicProps,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ boolean isAddOp = false;
+ String printPrefix = "[updClusterConfig], ";
+ ClusterSettingEntity curEntity;
+ ClusterSettingEntity newEntity;
+ try {
+ // lock clusterConfig meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8("clusterConfig#"), true);
+ // build add or update data
+ curEntity = clusterConfigMapper.getClusterConfig();
+ if (curEntity == null) {
+ isAddOp = true;
+ printPrefix = "[addClusterConfig], ";
+ newEntity = new ClusterSettingEntity(opEntity);
+ newEntity.fillDefaultValue();
+ newEntity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
+ } else {
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(opEntity);
+ if (!newEntity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ "Cluster configure not changed!");
+ return result.isSuccess();
+ }
+ }
+ // add or update data to storage
+ clusterConfigMapper.addUpdClusterConfig(newEntity, strBuff, result);
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ if (result.isSuccess()) {
+ // print operation result
+ if (isAddOp) {
+ strBuff.append(printPrefix).append(newEntity.getCreateUser())
+ .append(" added cluster configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(newEntity.getModifyUser())
+ .append(" updated cluster configure: from ").append(curEntity)
+ .append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public ClusterSettingEntity getClusterDefSetting(boolean isMustConf) {
+ ClusterSettingEntity curClsSetting =
+ clusterConfigMapper.getClusterConfig();
+ if (!isMustConf && curClsSetting == null) {
+ curClsSetting = defClusterSetting;
+ }
+ return curClsSetting;
+ }
+
+ // //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opEntity,
+ int brokerId, String brokerIp, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int regionId, int groupId, ManageStatus mngStatus,
+ TopicPropGroup topicProps, StringBuilder strBuff,
+ ProcessResult result) {
+ BrokerConfEntity entity =
+ new BrokerConfEntity(opEntity, brokerId, brokerIp);
+ entity.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, regionId, groupId, mngStatus, topicProps);
+ return addOrUpdBrokerConfig(isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ BrokerConfEntity curEntity = null;
+ BrokerConfEntity newEntity;
+ String printPrefix = "[addBrokerConf], ";
+ // execute add or update operation
+ try {
+ // lock brokerId meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
+ if (isAddOp) {
+ brokerConfigMapper.addBrokerConf(entity, strBuff, result);
+ } else {
+ printPrefix = "[updBrokerConf], ";
+ curEntity = brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+ brokerConfigMapper.updBrokerConf(entity, strBuff, result);
+ }
+ newEntity = brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ if (isAddOp) {
+ strBuff.append(printPrefix).append(entity.getCreateUser())
+ .append(" added broker configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(entity.getModifyUser())
+ .append(" updated broker configure from ").append(curEntity)
+ .append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean changeBrokerConfStatus(BaseEntity opEntity,
+ int brokerId, ManageStatus newMngStatus,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ BrokerConfEntity curEntity;
+ BrokerConfEntity newEntity;
+ String printPrefix = "[updBrokerConf], ";
+ // execute update operation
+ try {
+ // lock brokerId meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+ curEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+ brokerConfigMapper.updBrokerMngStatus(opEntity,
+ brokerId, newMngStatus, strBuff, result);
+ newEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(opEntity.getModifyUser())
+ .append(" updated broker configure from ").append(curEntity)
+ .append(" to ").append(newEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delBrokerConfInfo(String operator, int brokerId, boolean rsvData,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ BrokerConfEntity curEntity;
+ String printPrefix = "[delBrokerConf], ";
+ // execute delete operation
+ try {
+ // lock brokerId meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+ // get current broker configure
+ curEntity = brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+ if (curEntity == null) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ // check broker's manage status
+ if (curEntity.getManageStatus().isOnlineStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ strBuff.append("Illegal manage status, please offline the broker(")
+ .append(WebFieldDef.BROKERID.name).append("=")
+ .append(curEntity.getBrokerId()).append(") first!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // check topic deploy status
+ if (!topicDeployMapper.getConfiguredTopicInfo(curEntity.getBrokerId()).isEmpty()) {
+ if (rsvData) {
+ if (!topicDeployMapper.delTopicConfByBrokerId(brokerId, strBuff, result)) {
+ return result.isSuccess();
+ }
+ strBuff.append("[delTopicDeployByBrokerId], ").append(operator)
+ .append(" deleted topic deploy configure: ").append(brokerId);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCLEANED.getCode(),
+ strBuff.append("Illegal operate conditions, the broker(")
+ .append(curEntity.getBrokerId())
+ .append(")'s topic deploy configure uncleaned, please delete them first!")
+ .toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ }
+ // execute delete operation
+ brokerConfigMapper.delBrokerConf(brokerId, strBuff, result);
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(operator)
+ .append(" deleted broker configure: ").append(curEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity) {
+ return brokerConfigMapper.getBrokerConfInfo(qryEntity);
+ }
+
+ @Override
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity) {
+ return brokerConfigMapper.getBrokerConfInfo(brokerIdSet, brokerIpSet, qryEntity);
+ }
+
+ @Override
+ public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
+ return brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+ }
+
+ @Override
+ public BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp) {
+ return brokerConfigMapper.getBrokerConfByBrokerIp(brokerIp);
+ }
+
+ // ///////////////////////////////////////////////////////////////////////////////
+
+ public boolean addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String topicName, int topicNameId,
+ Boolean enableTopicAuth, int maxMsgSizeInMB,
+ StringBuilder sBuffer, ProcessResult result) {
+ TopicCtrlEntity entity =
+ new TopicCtrlEntity(opEntity, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableTopicAuth);
+ return addOrUpdTopicCtrlConf(isAddOp, entity, sBuffer, result);
+ }
+
+ @Override
+ public boolean addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return innAddOrUpdTopicCtrlConf(true, isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertTopicCtrlConf(BaseEntity opEntity,
+ String topicName, Boolean enableTopicAuth,
+ StringBuilder strBuff, ProcessResult result) {
+ TopicCtrlEntity entity =
+ new TopicCtrlEntity(opEntity, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, enableTopicAuth);
+ return innAddOrUpdTopicCtrlConf(false, false, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return innAddOrUpdTopicCtrlConf(false, false, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean delTopicCtrlConf(String operator, String topicName,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ TopicCtrlEntity curEntity;
+ String printPrefix = "[delTopicCtrlConf], ";
+ // execute delete operation
+ try {
+ // lock brokerId meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(topicName), true);
+ curEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
+ if (curEntity == null) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ // check topic use status
+ if (topicDeployMapper.isTopicDeployed(topicName)) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ strBuff.append("TopicName ").append(topicName)
+ .append(" is in use, please delete deploy configure first!")
+ .toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ if (consumeCtrlMapper.isTopicNameInUse(topicName)) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ strBuff.append("TopicName ").append(topicName)
+ .append(" is in use, please delete the consume control first!")
+ .toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ topicCtrlMapper.delTopicCtrlConf(topicName, strBuff, result);
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print operation log
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(operator)
+ .append(" deleted topic control configure: ")
+ .append(curEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public TopicCtrlEntity getTopicCtrlByTopicName(String topicName) {
+ return this.topicCtrlMapper.getTopicCtrlConf(topicName);
+ }
+
+ @Override
+ public int getTopicMaxMsgSizeInMB(String topicName) {
+ // get maxMsgSizeInMB info
+ ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false);
+ int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
+ TopicCtrlEntity topicCtrlEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
+ if (topicCtrlEntity != null) {
+ maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+ }
+ return maxMsgSizeInMB;
+ }
+
+ @Override
+ public List<TopicCtrlEntity> queryTopicCtrlConf(TopicCtrlEntity qryEntity) {
+ return topicCtrlMapper.getTopicCtrlConf(qryEntity);
+ }
+
+ @Override
+ public Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity) {
+ return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity);
+ }
+
+ /**
+ * Add if absent topic control configure info
+ *
+ * @param opEntity the operation info
+ * @param topicName the topic name will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ private boolean addTopicCtrlConfIfAbsent(BaseEntity opEntity, String topicName,
+ StringBuilder strBuff, ProcessResult result) {
+ int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ ClusterSettingEntity defSetting = getClusterDefSetting(false);
+ if (defSetting != null) {
+ maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+ }
+ TopicCtrlEntity entity = new TopicCtrlEntity(opEntity, topicName,
+ TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB);
+ return innAddOrUpdTopicCtrlConf(false, true, entity, strBuff, result);
+ }
+
+ /**
+ * Add or Update topic control configure info
+ *
+ * @param chkConsistent whether order operation condition
+ * @param isAddOpOrOnlyAdd the operation type,
+ * @param entity the entity need to operation
+ * @param strBuff the string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ private boolean innAddOrUpdTopicCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+ TopicCtrlEntity entity, StringBuilder strBuff,
+ ProcessResult result) {
+ Integer lid = null;
+ TopicCtrlEntity curEntity;
+ TopicCtrlEntity newEntity;
+ boolean addRecord = true;
+ String printPrefix = "[addTopicCtrlConf], ";
+ // execute add or update operation
+ try {
+ // lock brokerId meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(entity.getTopicName()), true);
+ // get current record and judge execute condition
+ curEntity = topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
+ if (curEntity == null) {
+ if (chkConsistent && !isAddOpOrOnlyAdd) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found topic control configure for topicName(")
+ .append(entity.getTopicName()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
+ } else {
+ if (isAddOpOrOnlyAdd) {
+ if (chkConsistent) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ strBuff.append("Existed record found for topicName(")
+ .append(entity.getTopicName()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ } else {
+ result.setSuccResult(null);
+ }
+ return result.isSuccess();
+ }
+ addRecord = false;
+ printPrefix = "[udpTopicCtrlConf], ";
+ topicCtrlMapper.updTopicCtrlConf(entity, strBuff, result);
+ }
+ newEntity = topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ if (addRecord) {
+ strBuff.append(printPrefix).append(entity.getCreateUser())
+ .append(" added topic control configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(entity.getModifyUser())
+ .append(" updated topic control configure from ")
+ .append(curEntity).append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ // ///////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
+ int brokerId, String topicName,
+ TopicStatus deployStatus,
+ TopicPropGroup topicPropInfo,
+ StringBuilder strBuff, ProcessResult result) {
+ TopicDeployEntity entity =
+ new TopicDeployEntity(opEntity, brokerId, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ null, deployStatus, topicPropInfo);
+ return addOrUpdTopicDeployInfo(isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean addOrUpdTopicDeployInfo(boolean isAddOp, TopicDeployEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ TopicDeployEntity curEntity;
+ TopicDeployEntity newEntity;
+ String printPrefix = "[addTopicDeployConf], ";
+ Integer topicLockId = null;
+ Integer brokerLockId = null;
+ // add topic control configure
+ addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+ // execute add or update operation
+ try {
+ // lock topicName meta-lock
+ topicLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(entity.getTopicName()), true);
+ try {
+ // lock brokerId meta-lock
+ brokerLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
+ // check broker configure exist
+ BrokerConfEntity brokerEntity =
+ brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+ if (brokerEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found broker configure by brokerId=")
+ .append(entity.getBrokerId())
+ .append(", please create the broker's configure first!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // check topic deploy configure
+ curEntity = topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+ if (isAddOp) {
+ if (curEntity != null) {
+ if (curEntity.isValidTopicStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ strBuff.append("Existed record found for brokerId-topicName(")
+ .append(curEntity.getRecordKey()).append(")!").toString());
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ strBuff.append("Softly deleted record found for brokerId-topicName(")
+ .append(curEntity.getRecordKey())
+ .append("), please resume or remove it first!").toString());
+ }
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // add record
+ topicDeployMapper.addTopicDeployConf(entity, strBuff, result);
+ } else {
+ printPrefix = "[updTopicDeployConf], ";
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+ .append(entity.getRecordKey()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // update record
+ topicDeployMapper.updTopicDeployConf(entity, strBuff, result);
+ }
+ newEntity = topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+ } finally {
+ if (brokerLockId != null) {
+ metaRowLock.releaseRowLock(brokerLockId);
+ }
+ }
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (topicLockId != null) {
+ metaRowLock.releaseRowLock(topicLockId);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ if (isAddOp) {
+ strBuff.append(printPrefix).append(entity.getCreateUser())
+ .append(" added topic deploy configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(entity.getModifyUser())
+ .append(" updated topic deploy configure from ")
+ .append(curEntity).append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean updTopicDeployStatusInfo(BaseEntity opEntity, int brokerId,
+ String topicName, TopicStatus topicStatus,
+ StringBuilder strBuff, ProcessResult result) {
+ TopicDeployEntity curEntity;
+ TopicDeployEntity newEntity;
+ String printPrefix = "[updTopicDeployConf], ";
+ Integer topicLockId = null;
+ Integer brokerLockId = null;
+ // execute add or update operation
+ try {
+ // lock topicName meta-lock
+ topicLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(topicName), true);
+ try {
+ // lock brokerId meta-lock
+ brokerLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(String.valueOf(brokerId)), true);
+ // check broker configure exist
+ BrokerConfEntity brokerEntity =
+ brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
+ if (brokerEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found broker configure by brokerId=")
+ .append(brokerId)
+ .append(", please create the broker's configure first!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // check topic deploy configure
+ curEntity = topicDeployMapper.getTopicConf(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+ .append(brokerId).append("-").append(topicName)
+ .append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ if (curEntity.getTopicStatus() == topicStatus) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ printPrefix = "[updTopicDeployConf], ";
+ // update record
+ topicDeployMapper.updTopicDeployStatus(opEntity,
+ brokerId, topicName, topicStatus, strBuff, result);
+ newEntity = topicDeployMapper.getTopicConfByeRecKey(curEntity.getRecordKey());
+ } finally {
+ if (brokerLockId != null) {
+ metaRowLock.releaseRowLock(brokerLockId);
+ }
+ }
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (topicLockId != null) {
+ metaRowLock.releaseRowLock(topicLockId);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(opEntity.getModifyUser())
+ .append(" updated topic deploy configure from ")
+ .append(curEntity).append(" to ").append(newEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public Map<String, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet,
+ TopicDeployEntity qryEntity) {
+ return topicDeployMapper.getTopicConfMap(topicNameSet, brokerIdSet, qryEntity);
+ }
+
+ @Override
+ public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
+ Set<Integer> brokerIdSet) {
+ Map<Integer, BrokerConfEntity> qryBrokerInfoMap =
+ brokerConfigMapper.getBrokerConfInfo(brokerIdSet, null, null);
+ if (qryBrokerInfoMap.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, qryBrokerInfoMap.keySet());
+ }
+
+ @Override
+ public Map<String, List<TopicDeployEntity>> getTopicConfInfoByTopicAndBrokerIds(
+ Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ return topicDeployMapper.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIdSet);
+ }
+
+ // //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String groupName, Boolean resCheckEnable,
+ int allowedBClientRate, int qryPriorityId,
+ Boolean flowCtrlEnable, int flowRuleCnt,
+ String flowCtrlInfo, StringBuilder strBuff,
+ ProcessResult result) {
+ GroupResCtrlEntity entity =
+ new GroupResCtrlEntity(opEntity, groupName);
+ entity.updModifyInfo(opEntity.getDataVerId(), resCheckEnable, allowedBClientRate,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
+ return addOrUpdGroupResCtrlConf(isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean addOrUpdGroupResCtrlConf(boolean isAddOp, GroupResCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return addOrUpdGroupCtrlConf(true,
+ isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlRuleInfo,
+ StringBuilder strBuff, ProcessResult result) {
+ GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+ newEntity.updModifyInfo(opEntity.getDataVerId(), null,
+ TBaseConstants.META_VALUE_UNDEFINED, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlRuleInfo);
+ return addOrUpdGroupCtrlConf(false,
+ false, newEntity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertGroupCtrlConf(BaseEntity opEntity, String groupName,
+ Boolean resChkEnable, int allowedB2CRate,
+ StringBuilder strBuff, ProcessResult result) {
+ GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+ newEntity.updModifyInfo(opEntity.getDataVerId(), resChkEnable, allowedB2CRate,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null);
+ return addOrUpdGroupCtrlConf(false,
+ false, newEntity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertGroupCtrlConf(GroupResCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return addOrUpdGroupCtrlConf(false, false, entity, strBuff, result);
+ }
+
+ /**
+ * Add if absent group control configure info
+ *
+ * @param opEntity the operation info
+ * @param groupName the group name will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ private boolean addGroupCtrlConfIfAbsent(BaseEntity opEntity, String groupName,
+ StringBuilder strBuff, ProcessResult result) {
+ GroupResCtrlEntity resCtrlEntity =
+ groupResCtrlMapper.getGroupResCtrlConf(groupName);
+ if (resCtrlEntity != null) {
+ result.setSuccResult(null);
+ return true;
+ }
+ resCtrlEntity = new GroupResCtrlEntity(opEntity, groupName);
+ resCtrlEntity.fillDefaultValue();
+ return addOrUpdGroupCtrlConf(false, true, resCtrlEntity, strBuff, result);
+ }
+
+ /**
+ * Add or Update group control configure info
+ *
+ * @param chkConsistent whether order operation condition
+ * @param isAddOpOrOnlyAdd the operation type,
+ * @param entity the entity need to operation
+ * @param strBuff the string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ private boolean addOrUpdGroupCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+ GroupResCtrlEntity entity, StringBuilder strBuff,
+ ProcessResult result) {
+ Integer lid = null;
+ boolean addRecord = true;
+ GroupResCtrlEntity curEntity;
+ GroupResCtrlEntity newEntity;
+ String printPrefix = "[addGroupCtrlConf], ";
+ // execute add or update operation
+ try {
+ // lock group name meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(entity.getGroupName()), true);
+ // get current record and judge execute condition
+ curEntity = groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
+ if (curEntity == null) {
+ if (chkConsistent && !isAddOpOrOnlyAdd) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found group control configure for groupName(")
+ .append(entity.getGroupName()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ groupResCtrlMapper.addGroupResCtrlConf(entity, strBuff, result);
+ } else {
+ if (isAddOpOrOnlyAdd) {
+ if (chkConsistent) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ strBuff.append("Existed record found for groupName(")
+ .append(entity.getGroupName()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ } else {
+ result.setSuccResult(null);
+ }
+ return result.isSuccess();
+ }
+ addRecord = false;
+ printPrefix = "[updGroupCtrlConf], ";
+ groupResCtrlMapper.updGroupResCtrlConf(entity, strBuff, result);
+ }
+ newEntity = groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ if (addRecord) {
+ strBuff.append(printPrefix).append(entity.getCreateUser())
+ .append(" added group control configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(entity.getModifyUser())
+ .append(" updated group control configure from ")
+ .append(curEntity).append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delGroupCtrlConf(String operator, String groupName,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer lid = null;
+ GroupResCtrlEntity curEntity;
+ String printPrefix = "[delGroupCtrlConf], ";
+ // execute add or update operation
+ try {
+ // lock group name meta-lock
+ lid = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(groupName), true);
+ // get current record and judge execute condition
+ curEntity = groupResCtrlMapper.getGroupResCtrlConf(groupName);
+ if (curEntity == null) {
+ result.setFailResult(null);
+ return result.isSuccess();
+ }
+ if (consumeCtrlMapper.isGroupNameInUse(groupName)) {
+ result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
+ strBuff.append("Group ").append(groupName)
+ .append(" has consume control configures,")
+ .append(", please delete consume control configures first!")
+ .toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ groupResCtrlMapper.delGroupResCtrlConf(groupName, strBuff, result);
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (lid != null) {
+ metaRowLock.releaseRowLock(lid);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(operator)
+ .append(" deleted group control configure: ").append(curEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public Map<String, GroupResCtrlEntity> getGroupCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity) {
+ return groupResCtrlMapper.getGroupResCtrlConf(groupSet, qryEntity);
+ }
+
+ @Override
+ public GroupResCtrlEntity getGroupCtrlConf(String groupName) {
+ return groupResCtrlMapper.getGroupResCtrlConf(groupName);
+ }
+
+ // //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, BaseEntity opEntity,
+ String groupName, String topicName,
+ Boolean enableCsm, String disableRsn,
+ Boolean enableFlt, String fltCondStr,
+ StringBuilder strBuff, ProcessResult result) {
+ GroupConsumeCtrlEntity entity =
+ new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ enableCsm, disableRsn, enableFlt, fltCondStr);
+ return addOrUpdConsumeCtrlConf(true, isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean addOrUpdConsumeCtrlInfo(boolean isAddOp, GroupConsumeCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return addOrUpdConsumeCtrlConf(true, isAddOp, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertConsumeCtrlInfo(BaseEntity opEntity, String groupName,
+ String topicName, Boolean enableCsm,
+ String disReason, Boolean enableFlt,
+ String fltCondStr, StringBuilder strBuff,
+ ProcessResult result) {
+ GroupConsumeCtrlEntity entity =
+ new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ enableCsm, disReason, enableFlt, fltCondStr);
+ return addOrUpdConsumeCtrlConf(false, false, entity, strBuff, result);
+ }
+
+ @Override
+ public boolean insertConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
+ return addOrUpdConsumeCtrlConf(false, false, entity, strBuff, result);
+ }
+
+ /**
+ * Add or Update consume control configure info
+ *
+ * @param chkConsistent whether order operation condition
+ * @param isAddOpOrOnlyAdd the operation type,
+ * @param entity the entity need to operation
+ * @param strBuff the string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ private boolean addOrUpdConsumeCtrlConf(boolean chkConsistent, boolean isAddOpOrOnlyAdd,
+ GroupConsumeCtrlEntity entity, StringBuilder strBuff,
+ ProcessResult result) {
+ boolean addRecord = true;
+ Integer topicLockId = null;
+ Integer groupLockId = null;
+ GroupConsumeCtrlEntity curEntity;
+ GroupConsumeCtrlEntity newEntity;
+ String printPrefix = "[addConsumeCtrlConf], ";
+ // append topic control configure
+ addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+ // append group control configure
+ addGroupCtrlConfIfAbsent(entity, entity.getGroupName(), strBuff, result);
+ // execute add or update operation
+ try {
+ // lock topicName meta-lock
+ topicLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(entity.getTopicName()), true);
+ try {
+ // lock groupName meta-lock
+ groupLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(entity.getGroupName()), true);
+ // check consume control configure exist
+ curEntity = consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+ if (curEntity == null) {
+ if (chkConsistent && !isAddOpOrOnlyAdd) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found consume control for groupName-topicName(")
+ .append(entity.getRecordKey()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ consumeCtrlMapper.addGroupConsumeCtrlConf(entity, strBuff, result);
+ } else {
+ if (isAddOpOrOnlyAdd) {
+ if (chkConsistent) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ strBuff.append("Existed record found for groupName-topicName(")
+ .append(entity.getRecordKey()).append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ } else {
+ result.setSuccResult(null);
+ }
+ return result.isSuccess();
+ }
+ addRecord = false;
+ printPrefix = "[updConsumeCtrlConf], ";
+ consumeCtrlMapper.updGroupConsumeCtrlConf(entity, strBuff, result);
+ }
+ newEntity = consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+ } finally {
+ if (groupLockId != null) {
+ metaRowLock.releaseRowLock(groupLockId);
+ }
+ }
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (topicLockId != null) {
+ metaRowLock.releaseRowLock(topicLockId);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ if (addRecord) {
+ strBuff.append(printPrefix).append(entity.getCreateUser())
+ .append(" added consume control configure: ").append(newEntity);
+ } else {
+ strBuff.append(printPrefix).append(entity.getModifyUser())
+ .append(" updated consume control configure from ")
+ .append(curEntity).append(" to ").append(newEntity);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delConsumeCtrlConf(String operator,
+ String groupName, String topicName,
+ StringBuilder strBuff, ProcessResult result) {
+ Integer topicLockId = null;
+ Integer groupLockId = null;
+ GroupConsumeCtrlEntity curEntity;
+ String printPrefix = "[delConsumeCtrlConf], ";
+ // execute delete operation
+ try {
+ // lock topicName meta-lock
+ topicLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(topicName), true);
+ try {
+ // lock groupName meta-lock
+ groupLockId = metaRowLock.getLock(null,
+ StringUtils.getBytesUtf8(groupName), true);
+ // check consume control configure exist
+ curEntity = consumeCtrlMapper.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+ if (curEntity == null) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ consumeCtrlMapper.delGroupConsumeCtrlConf(groupName, topicName, strBuff, result);
+ } finally {
+ if (groupLockId != null) {
+ metaRowLock.releaseRowLock(groupLockId);
+ }
+ }
+ } catch (Throwable e) {
+ return logExceptionInfo(e, printPrefix, strBuff, result);
+ } finally {
+ if (topicLockId != null) {
+ metaRowLock.releaseRowLock(topicLockId);
+ }
+ }
+ // print log to file
+ if (result.isSuccess()) {
+ strBuff.append(printPrefix).append(operator)
+ .append(" deleted consume control configure: ").append(curEntity);
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopic(String topicName) {
+ return consumeCtrlMapper.getConsumeCtrlByTopicName(topicName);
+ }
+
+ @Override
+ public Set<String> getDisableTopicByGroupName(String groupName) {
+ Set<String> disTopicSet = new HashSet<>();
+ List<GroupConsumeCtrlEntity> qryResult =
+ consumeCtrlMapper.getConsumeCtrlByGroupName(groupName);
+ if (qryResult.isEmpty()) {
+ return disTopicSet;
+ }
+ for (GroupConsumeCtrlEntity ctrlEntity : qryResult) {
+ if (ctrlEntity != null && !ctrlEntity.isEnableConsume()) {
+ disTopicSet.add(ctrlEntity.getTopicName());
+ }
+ }
+ return disTopicSet;
+ }
+
+ @Override
+ public Map<String, List<GroupConsumeCtrlEntity>> getGroupConsumeCtrlConf(
+ Set<String> groupSet, Set<String> topicSet, GroupConsumeCtrlEntity qryEntry) {
+ return consumeCtrlMapper.getConsumeCtrlInfoMap(groupSet, topicSet, qryEntry);
+ }
+
+ /**
+ * Initial meta-data stores.
+ *
+ */
+ protected void initMetaStore() {
+
+ }
+
+ /**
+ * Reload meta-data stores.
+ *
+ * @param strBuff the string buffer
+ */
+ private void reloadMetaStore(StringBuilder strBuff) {
+ // Clear observers' cache data.
+ for (AliveObserver observer : eventObservers) {
+ observer.clearCacheData();
+ }
+ // Load the latest meta-data from persistent
+ clusterConfigMapper.loadConfig(strBuff);
+ brokerConfigMapper.loadConfig(strBuff);
+ topicDeployMapper.loadConfig(strBuff);
+ topicCtrlMapper.loadConfig(strBuff);
+ groupResCtrlMapper.loadConfig(strBuff);
+ consumeCtrlMapper.loadConfig(strBuff);
+ // load the latest meta-data to observers
+ for (AliveObserver observer : eventObservers) {
+ observer.reloadCacheData();
+ }
+ }
+
+ /**
+ * Close meta-data stores.
+ *
+ */
+ private void closeMetaStore() {
+ brokerConfigMapper.close();
+ topicDeployMapper.close();
+ groupResCtrlMapper.close();
+ topicCtrlMapper.close();
+ consumeCtrlMapper.close();
+ clusterConfigMapper.close();
+ }
+
+ private boolean logExceptionInfo(Throwable e, String printPrefix,
+ StringBuilder strBuff, ProcessResult result) {
+ strBuff.delete(0, strBuff.length());
+ strBuff.append(printPrefix).append("failed to lock meta-lock.");
+ logger.warn(strBuff.toString(), e);
+ result.setFailResult(DataOpErrCode.DERR_STORE_LOCK_FAILURE.getCode(),
+ strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
index 9504ba1..f9805c7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
@@ -86,7 +86,7 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
topicCtrlCache.put(newEntity.getTopicName(), newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@@ -103,7 +103,7 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
}
delConfigFromPersistent(topicName, strBuff);
topicCtrlCache.remove(topicName);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return result.isSuccess();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index 0951c16..d34a4f3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -32,6 +32,7 @@ import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
import org.slf4j.Logger;
@@ -42,24 +43,24 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
LoggerFactory.getLogger(AbsTopicDeployMapperImpl.class);
// data cache
private final ConcurrentHashMap<String/* recordKey */, TopicDeployEntity>
- topicConfCache = new ConcurrentHashMap<>();
+ topicDeployCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
- brokerIdCacheIndex = new ConcurrentHashMap<>();
+ brokerId2RecordCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
- topicNameCacheIndex = new ConcurrentHashMap<>();
+ topicName2RecordCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String>>
- brokerId2TopicCacheIndex = new ConcurrentHashMap<>();
+ brokerId2TopicNameCache = new ConcurrentHashMap<>();
public AbsTopicDeployMapperImpl() {
// Initial instant
}
@Override
- public boolean addTopicConf(TopicDeployEntity entity,
- StringBuilder strBuff, ProcessResult result) {
+ public boolean addTopicDeployConf(TopicDeployEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
// Checks whether the record already exists
TopicDeployEntity curEntity =
- topicConfCache.get(entity.getRecordKey());
+ topicDeployCache.get(entity.getRecordKey());
if (curEntity != null) {
if (curEntity.isValidTopicStatus()) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -75,7 +76,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
return result.isSuccess();
}
// valid whether system topic
- if (!validSysTopicConfigure(entity, strBuff, result)) {
+ if (!isValidSysTopicConf(entity, strBuff, result)) {
return result.isSuccess();
}
// check deploy status if still accept publish and subscribe
@@ -96,11 +97,11 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
}
@Override
- public boolean updTopicConf(TopicDeployEntity entity,
- StringBuilder strBuff, ProcessResult result) {
+ public boolean updTopicDeployConf(TopicDeployEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
// Checks whether the record already exists
TopicDeployEntity curEntity =
- topicConfCache.get(entity.getRecordKey());
+ topicDeployCache.get(entity.getRecordKey());
if (curEntity == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
strBuff.append("Not found topic deploy configure for brokerId-topicName(")
@@ -120,25 +121,61 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
return result.isSuccess();
}
// valid whether system topic
- if (!validSysTopicConfigure(newEntity, strBuff, result)) {
+ if (!isValidSysTopicConf(newEntity, strBuff, result)) {
return result.isSuccess();
}
// check deploy status
- if (isIllegalValuesChange(newEntity, curEntity, strBuff, result)) {
+ if (!isValidValuesChange(newEntity, curEntity, strBuff, result)) {
return result.isSuccess();
}
// Store data to persistent
if (putConfig2Persistent(newEntity, strBuff, result)) {
putRecord2Caches(newEntity);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
}
return result.isSuccess();
}
@Override
- public boolean delTopicConf(String recordKey, StringBuilder strBuff, ProcessResult result) {
+ public boolean updTopicDeployStatus(BaseEntity opEntity, int brokerId,
+ String topicName, TopicStatus topicStatus,
+ StringBuilder strBuff, ProcessResult result) {
+ // Checks whether the record already exists
+ TopicDeployEntity curEntity = getTopicConf(brokerId, topicName);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ strBuff.append("Not found topic deploy configure for brokerId-topicName(")
+ .append(brokerId).append("-").append(topicName)
+ .append(")!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ // Build the entity that need to be updated
+ TopicDeployEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(opEntity);
+ if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ null, topicStatus, null)) {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ "Topic deploy configure not changed!");
+ return result.isSuccess();
+ }
+ // check deploy status
+ if (!isValidValuesChange(newEntity, curEntity, strBuff, result)) {
+ return result.isSuccess();
+ }
+ // Store data to persistent
+ if (putConfig2Persistent(newEntity, strBuff, result)) {
+ putRecord2Caches(newEntity);
+ result.setSuccResult(null);
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean delTopicDeployConf(String recordKey, StringBuilder strBuff, ProcessResult result) {
TopicDeployEntity curEntity =
- topicConfCache.get(recordKey);
+ topicDeployCache.get(recordKey);
if (curEntity == null) {
result.setSuccResult(null);
return result.isSuccess();
@@ -154,14 +191,14 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
}
delConfigFromPersistent(recordKey, strBuff);
delRecordFromCaches(recordKey);
- result.setSuccResult(curEntity);
+ result.setSuccResult(null);
return result.isSuccess();
}
@Override
public boolean delTopicConfByBrokerId(Integer brokerId, StringBuilder strBuff, ProcessResult result) {
ConcurrentHashSet<String> recordKeySet =
- brokerIdCacheIndex.get(brokerId);
+ brokerId2RecordCache.get(brokerId);
if (recordKeySet == null || recordKeySet.isEmpty()) {
result.setSuccResult(null);
return result.isSuccess();
@@ -169,7 +206,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
// check deploy status if still accept publish and subscribe
TopicDeployEntity curEntity;
for (String recordKey : recordKeySet) {
- curEntity = topicConfCache.get(recordKey);
+ curEntity = topicDeployCache.get(recordKey);
if (curEntity == null) {
continue;
}
@@ -194,13 +231,13 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public boolean hasConfiguredTopics(int brokerId) {
ConcurrentHashSet<String> keySet =
- brokerIdCacheIndex.get(brokerId);
+ brokerId2RecordCache.get(brokerId);
return (keySet != null && !keySet.isEmpty());
}
@Override
public boolean isTopicDeployed(String topicName) {
- ConcurrentHashSet<String> deploySet = topicNameCacheIndex.get(topicName);
+ ConcurrentHashSet<String> deploySet = topicName2RecordCache.get(topicName);
return (deploySet != null && !deploySet.isEmpty());
}
@@ -208,9 +245,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
public List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity) {
List<TopicDeployEntity> retEntities = new ArrayList<>();
if (qryEntity == null) {
- retEntities.addAll(topicConfCache.values());
+ retEntities.addAll(topicDeployCache.values());
} else {
- for (TopicDeployEntity entity : topicConfCache.values()) {
+ for (TopicDeployEntity entity : topicDeployCache.values()) {
if (entity != null && entity.isMatched(qryEntity)) {
retEntities.add(entity);
}
@@ -223,12 +260,12 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
public TopicDeployEntity getTopicConf(int brokerId, String topicName) {
String recordKey =
KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
- return topicConfCache.get(recordKey);
+ return topicDeployCache.get(recordKey);
}
@Override
public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
- return topicConfCache.get(recordKey);
+ return topicDeployCache.get(recordKey);
}
@Override
@@ -241,7 +278,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
// filter record by qryEntity
if (matchedKeySet == null) {
- for (TopicDeployEntity entry : topicConfCache.values()) {
+ for (TopicDeployEntity entry : topicDeployCache.values()) {
if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
continue;
}
@@ -252,7 +289,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
} else {
TopicDeployEntity entry;
for (String recKey : matchedKeySet) {
- entry = topicConfCache.get(recKey);
+ entry = topicDeployCache.get(recKey);
if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
continue;
}
@@ -278,10 +315,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
Set<String> matchedKeySet = getMatchedRecords(topicNameSet, brokerIdSet);
// get record by keys
if (matchedKeySet == null) {
- matchedKeySet = new HashSet<>(topicConfCache.keySet());
+ matchedKeySet = new HashSet<>(topicDeployCache.keySet());
}
for (String recordKey: matchedKeySet) {
- TopicDeployEntity entity = topicConfCache.get(recordKey);
+ TopicDeployEntity entity = topicDeployCache.get(recordKey);
if (entity == null) {
continue;
}
@@ -302,7 +339,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
Set<String> matchedKeySet = getMatchedRecords(topicSet, brokerIdSet);
// get records by matched keys
if (matchedKeySet == null) {
- for (TopicDeployEntity entity : topicConfCache.values()) {
+ for (TopicDeployEntity entity : topicDeployCache.values()) {
if (entity == null) {
continue;
}
@@ -312,7 +349,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
}
} else {
for (String key : matchedKeySet) {
- tmpEntity = topicConfCache.get(key);
+ tmpEntity = topicDeployCache.get(key);
if (tmpEntity == null) {
continue;
}
@@ -328,12 +365,12 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId) {
TopicDeployEntity tmpEntity;
Map<String, TopicDeployEntity> retEntityMap = new HashMap<>();
- ConcurrentHashSet<String> records = brokerIdCacheIndex.get(brokerId);
+ ConcurrentHashSet<String> records = brokerId2RecordCache.get(brokerId);
if (records == null || records.isEmpty()) {
return retEntityMap;
}
for (String key : records) {
- tmpEntity = topicConfCache.get(key);
+ tmpEntity = topicDeployCache.get(key);
if (tmpEntity == null) {
continue;
}
@@ -349,7 +386,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
Map<Integer, Set<String>> retEntityMap = new HashMap<>();
if (brokerIdSet == null || brokerIdSet.isEmpty()) {
for (Map.Entry<Integer, ConcurrentHashSet<String>> entry
- : brokerId2TopicCacheIndex.entrySet()) {
+ : brokerId2TopicNameCache.entrySet()) {
if (entry.getKey() == null) {
continue;
}
@@ -365,7 +402,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
continue;
}
topicSet = new HashSet<>();
- deploySet = brokerId2TopicCacheIndex.get(brokerId);
+ deploySet = brokerId2TopicNameCache.get(brokerId);
if (deploySet != null) {
topicSet.addAll(deploySet);
}
@@ -381,7 +418,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
Map<Integer, String> brokerInfoMap;
Map<String, Map<Integer, String>> retEntityMap = new HashMap<>();
if (topicNameSet == null || topicNameSet.isEmpty()) {
- for (TopicDeployEntity entry : topicConfCache.values()) {
+ for (TopicDeployEntity entry : topicDeployCache.values()) {
if (entry == null) {
continue;
}
@@ -395,10 +432,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
continue;
}
brokerInfoMap = retEntityMap.computeIfAbsent(topicName, k -> new HashMap<>());
- keySet = topicNameCacheIndex.get(topicName);
+ keySet = topicName2RecordCache.get(topicName);
if (keySet != null) {
for (String key : keySet) {
- TopicDeployEntity entry = topicConfCache.get(key);
+ TopicDeployEntity entry = topicDeployCache.get(key);
if (entry != null) {
brokerInfoMap.put(entry.getBrokerId(), entry.getBrokerIp());
}
@@ -411,17 +448,17 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Set<String> getConfiguredTopicSet() {
- return new HashSet<>(topicNameCacheIndex.keySet());
+ return new HashSet<>(topicName2RecordCache.keySet());
}
/**
* Clear cached data
*/
protected void clearCachedData() {
- topicNameCacheIndex.clear();
- brokerIdCacheIndex.clear();
- brokerId2TopicCacheIndex.clear();
- topicConfCache.clear();
+ topicName2RecordCache.clear();
+ brokerId2RecordCache.clear();
+ brokerId2TopicNameCache.clear();
+ topicDeployCache.clear();
}
/**
@@ -430,33 +467,33 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
* @param entity need added or updated entity
*/
protected void putRecord2Caches(TopicDeployEntity entity) {
- topicConfCache.put(entity.getRecordKey(), entity);
+ topicDeployCache.put(entity.getRecordKey(), entity);
// add topic index map
ConcurrentHashSet<String> keySet =
- topicNameCacheIndex.get(entity.getTopicName());
+ topicName2RecordCache.get(entity.getTopicName());
if (keySet == null) {
ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = topicNameCacheIndex.putIfAbsent(entity.getTopicName(), tmpSet);
+ keySet = topicName2RecordCache.putIfAbsent(entity.getTopicName(), tmpSet);
if (keySet == null) {
keySet = tmpSet;
}
}
keySet.add(entity.getRecordKey());
// add brokerId index map
- keySet = brokerIdCacheIndex.get(entity.getBrokerId());
+ keySet = brokerId2RecordCache.get(entity.getBrokerId());
if (keySet == null) {
ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = brokerIdCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+ keySet = brokerId2RecordCache.putIfAbsent(entity.getBrokerId(), tmpSet);
if (keySet == null) {
keySet = tmpSet;
}
}
keySet.add(entity.getRecordKey());
// add brokerId topic map
- keySet = brokerId2TopicCacheIndex.get(entity.getBrokerId());
+ keySet = brokerId2TopicNameCache.get(entity.getBrokerId());
if (keySet == null) {
ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = brokerId2TopicCacheIndex.putIfAbsent(entity.getBrokerId(), tmpSet);
+ keySet = brokerId2TopicNameCache.putIfAbsent(entity.getBrokerId(), tmpSet);
if (keySet == null) {
keySet = tmpSet;
}
@@ -486,33 +523,33 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
private void delRecordFromCaches(String recordKey) {
TopicDeployEntity curEntity =
- topicConfCache.remove(recordKey);
+ topicDeployCache.remove(recordKey);
if (curEntity == null) {
return;
}
// add topic index
ConcurrentHashSet<String> keySet =
- topicNameCacheIndex.get(curEntity.getTopicName());
+ topicName2RecordCache.get(curEntity.getTopicName());
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- topicNameCacheIndex.remove(curEntity.getTopicName());
+ topicName2RecordCache.remove(curEntity.getTopicName());
}
}
// delete brokerId index
- keySet = brokerIdCacheIndex.get(curEntity.getBrokerId());
+ keySet = brokerId2RecordCache.get(curEntity.getBrokerId());
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- brokerIdCacheIndex.remove(curEntity.getBrokerId());
+ brokerId2RecordCache.remove(curEntity.getBrokerId());
}
}
// delete broker topic map
- keySet = brokerId2TopicCacheIndex.get(curEntity.getBrokerId());
+ keySet = brokerId2TopicNameCache.get(curEntity.getBrokerId());
if (keySet != null) {
keySet.remove(curEntity.getTopicName());
if (keySet.isEmpty()) {
- brokerId2TopicCacheIndex.remove(curEntity.getBrokerId());
+ brokerId2TopicNameCache.remove(curEntity.getBrokerId());
}
}
}
@@ -527,7 +564,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
if (topicNameSet != null && !topicNameSet.isEmpty()) {
topicKeySet = new HashSet<>();
for (String topicName : topicNameSet) {
- keySet = topicNameCacheIndex.get(topicName);
+ keySet = topicName2RecordCache.get(topicName);
if (keySet != null && !keySet.isEmpty()) {
topicKeySet.addAll(keySet);
}
@@ -540,7 +577,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
brokerKeySet = new HashSet<>();
for (Integer brokerId : brokerIdSet) {
- keySet = brokerIdCacheIndex.get(brokerId);
+ keySet = brokerId2RecordCache.get(brokerId);
if (keySet != null && !keySet.isEmpty()) {
brokerKeySet.addAll(keySet);
}
@@ -570,19 +607,19 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
}
/**
- * Check whether the change of deploy values is illegal
+ * Check whether the change of deploy values is valid
* Attention, the newEntity and newEntity must not equal
*
* @param newEntity the entity to be updated
* @param curEntity the current entity
* @param strBuff string buffer
* @param result check result of parameter value
- * @return true for illegal, false for legal
+ * @return true for valid, false for invalid
*/
- private boolean isIllegalValuesChange(TopicDeployEntity newEntity,
- TopicDeployEntity curEntity,
- StringBuilder strBuff,
- ProcessResult result) {
+ private boolean isValidValuesChange(TopicDeployEntity newEntity,
+ TopicDeployEntity curEntity,
+ StringBuilder strBuff,
+ ProcessResult result) {
// check if shrink data store block
if (newEntity.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED
&& newEntity.getNumPartitions() < curEntity.getNumPartitions()) {
@@ -593,7 +630,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append("in brokerId-topicName(").append(curEntity.getRecordKey())
.append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
if (newEntity.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED
&& newEntity.getNumTopicStores() < curEntity.getNumTopicStores()) {
@@ -604,7 +641,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append("in brokerId-topicName(").append(curEntity.getRecordKey())
.append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
// check whether the deploy status is equal
if (newEntity.getTopicStatus() == curEntity.getTopicStatus()) {
@@ -614,9 +651,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append(" please resume or hard remove for brokerId-topicName(")
.append(newEntity.getRecordKey()).append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
- return false;
+ return true;
}
// check deploy status case from valid to invalid
if (curEntity.isValidTopicStatus() && !newEntity.isValidTopicStatus()) {
@@ -627,7 +664,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append(" before change status of brokerId-topicName(")
.append(curEntity.getRecordKey()).append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
if (newEntity.getTopicStatus().getCode()
> TopicStatus.STATUS_TOPIC_SOFT_DELETE.getCode()) {
@@ -635,9 +672,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
strBuff.append("Please softly deleted the brokerId-topicName(")
.append(newEntity.getRecordKey()).append(") record first!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
- return false;
+ return true;
}
// check deploy status case from invalid to invalid
if (!curEntity.isValidTopicStatus() && !newEntity.isValidTopicStatus()) {
@@ -652,7 +689,7 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append(" for the brokerId-topicName(")
.append(newEntity.getRecordKey()).append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
if (newEntity.isAcceptPublish()
|| newEntity.isAcceptSubscribe()) {
@@ -661,9 +698,9 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
.append(" before change status of brokerId-topicName(")
.append(newEntity.getRecordKey()).append(") record!").toString());
strBuff.delete(0, strBuff.length());
- return !result.isSuccess();
+ return result.isSuccess();
}
- return false;
+ return true;
}
// check deploy status case from invalid to valid
if (!curEntity.isValidTopicStatus() && newEntity.isValidTopicStatus()) {
@@ -697,10 +734,10 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
* @param deployEntity the topic configuration that needs to be added or updated
* @param strBuff the print info string buffer
* @param result the process result return
- * @return true if success otherwise false
+ * @return true if valid otherwise false
*/
- private boolean validSysTopicConfigure(TopicDeployEntity deployEntity,
- StringBuilder strBuff, ProcessResult result) {
+ private boolean isValidSysTopicConf(TopicDeployEntity deployEntity,
+ StringBuilder strBuff, ProcessResult result) {
if (!TServerConstants.OFFSET_HISTORY_NAME.equals(deployEntity.getTopicName())) {
return true;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
index b2917b7..aa4e13d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaStoreServiceImpl.java
@@ -211,46 +211,20 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
// cluster default configure api
@Override
- public boolean addClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result) {
- // check current status
- if (!checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- if (clusterConfigMapper.addClusterConfig(entity, strBuff, result)) {
- strBuff.append("[addClusterConfig], ").append(entity.getCreateUser())
- .append(" added cluster setting record :").append(entity);
- logger.info(strBuff.toString());
- } else {
- strBuff.append("[addClusterConfig], ")
- .append("failure to add cluster setting record : ")
- .append(result.getErrMsg());
- logger.warn(strBuff.toString());
- }
- strBuff.delete(0, strBuff.length());
- return result.isSuccess();
- }
-
- @Override
- public boolean updClusterConfig(ClusterSettingEntity entity,
- StringBuilder strBuff, ProcessResult result) {
+ public boolean addUpdClusterConfig(ClusterSettingEntity entity,
+ StringBuilder strBuff, ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- if (clusterConfigMapper.updClusterConfig(entity, strBuff, result)) {
- ClusterSettingEntity oldEntity =
- (ClusterSettingEntity) result.getRetData();
- ClusterSettingEntity curEntity =
- clusterConfigMapper.getClusterConfig();
- strBuff.append("[updClusterConfig], ")
- .append(entity.getModifyUser())
- .append(" updated record from :").append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ if (clusterConfigMapper.addUpdClusterConfig(entity, strBuff, result)) {
+ ClusterSettingEntity newEntity = clusterConfigMapper.getClusterConfig();
+ strBuff.append("[insertClusterConfig], ").append(entity.getCreateUser())
+ .append(" insert cluster setting record :").append(newEntity);
logger.info(strBuff.toString());
} else {
- strBuff.append("[updClusterConfig], ")
- .append("failure to update record : ")
+ strBuff.append("[addUpdClusterConfig], ")
+ .append("failure to insert cluster setting record : ")
.append(result.getErrMsg());
logger.warn(strBuff.toString());
}
@@ -316,16 +290,15 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
+ BrokerConfEntity curEntity =
+ brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
if (brokerConfigMapper.updBrokerConf(entity, strBuff, result)) {
- BrokerConfEntity oldEntity =
- (BrokerConfEntity) result.getRetData();
- BrokerConfEntity curEntity =
+ BrokerConfEntity newEntity =
brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
strBuff.append("[updBrokerConf], ")
.append(entity.getModifyUser())
.append(" updated broker configure record from :")
- .append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ .append(curEntity).append(" to ").append(newEntity);
logger.info(strBuff.toString());
} else {
strBuff.append("[updBrokerConf], ")
@@ -391,7 +364,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- if (topicDeployMapper.addTopicConf(entity, strBuff, result)) {
+ if (topicDeployMapper.addTopicDeployConf(entity, strBuff, result)) {
strBuff.append("[addTopicConf], ").append(entity.getCreateUser())
.append(" added topic configure record :").append(entity);
logger.info(strBuff.toString());
@@ -412,16 +385,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- if (topicDeployMapper.updTopicConf(entity, strBuff, result)) {
- TopicDeployEntity oldEntity =
- (TopicDeployEntity) result.getRetData();
- TopicDeployEntity curEntity =
+ TopicDeployEntity curEntity =
+ topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
+ if (topicDeployMapper.updTopicDeployConf(entity, strBuff, result)) {
+ TopicDeployEntity newEntity =
topicDeployMapper.getTopicConfByeRecKey(entity.getRecordKey());
- strBuff.append("[updTopicConf], ")
- .append(entity.getModifyUser())
- .append(" updated record from :")
- .append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ strBuff.append("[updTopicConf], ").append(entity.getModifyUser())
+ .append(" updated record from :").append(curEntity)
+ .append(" to ").append(newEntity);
logger.info(strBuff.toString());
} else {
strBuff.append("[updTopicConf], ")
@@ -440,7 +411,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- if (topicDeployMapper.delTopicConf(recordKey, strBuff, result)) {
+ if (topicDeployMapper.delTopicDeployConf(recordKey, strBuff, result)) {
GroupResCtrlEntity entity =
(GroupResCtrlEntity) result.getRetData();
if (entity != null) {
@@ -573,15 +544,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
+ TopicCtrlEntity curEntity =
+ topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
if (topicCtrlMapper.updTopicCtrlConf(entity, strBuff, result)) {
- TopicCtrlEntity oldEntity =
- (TopicCtrlEntity) result.getRetData();
- TopicCtrlEntity curEntity =
+ TopicCtrlEntity newEntity =
topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
- strBuff.append("[updTopicCtrlConf], ")
- .append(entity.getModifyUser())
- .append(" updated record from :").append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ strBuff.append("[updTopicCtrlConf], ").append(entity.getModifyUser())
+ .append(" updated record from :").append(curEntity)
+ .append(" to ").append(newEntity);
logger.info(strBuff.toString());
} else {
strBuff.append("[updTopicCtrlConf], ")
@@ -663,16 +633,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
+ GroupResCtrlEntity curEntity =
+ groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
if (groupResCtrlMapper.updGroupResCtrlConf(entity, strBuff, result)) {
- GroupResCtrlEntity oldEntity =
- (GroupResCtrlEntity) result.getRetData();
- GroupResCtrlEntity curEntity =
+ GroupResCtrlEntity newEntity =
groupResCtrlMapper.getGroupResCtrlConf(entity.getGroupName());
- strBuff.append("[updGroupResCtrlConf], ")
- .append(entity.getModifyUser())
- .append(" updated record from :")
- .append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ strBuff.append("[updGroupResCtrlConf], ").append(entity.getModifyUser())
+ .append(" updated record from :").append(curEntity)
+ .append(" to ").append(newEntity);
logger.info(strBuff.toString());
} else {
strBuff.append("[updGroupResCtrlConf], ")
@@ -748,15 +716,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
+ GroupConsumeCtrlEntity curEntity =
+ consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
if (consumeCtrlMapper.updGroupConsumeCtrlConf(entity, strBuff, result)) {
- GroupConsumeCtrlEntity oldEntity =
- (GroupConsumeCtrlEntity) result.getRetData();
- GroupConsumeCtrlEntity curEntity =
+ GroupConsumeCtrlEntity newEntity =
consumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
- strBuff.append("[updGroupConsumeCtrlConf], ")
- .append(entity.getModifyUser())
- .append(" updated record from :").append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
+ strBuff.append("[updGroupConsumeCtrlConf], ").append(entity.getModifyUser())
+ .append(" updated record from :").append(curEntity)
+ .append(" to ").append(newEntity);
logger.info(strBuff.toString());
} else {
strBuff.append("[updGroupConsumeCtrlConf], ")
@@ -825,12 +792,12 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
@Override
public boolean isTopicNameInUsed(String topicName) {
- return consumeCtrlMapper.isTopicNameInUsed(topicName);
+ return consumeCtrlMapper.isTopicNameInUse(topicName);
}
@Override
public boolean hasGroupConsumeCtrlConf(String groupName) {
- return consumeCtrlMapper.hasGroupConsumeCtrlConf(groupName);
+ return consumeCtrlMapper.isGroupNameInUse(groupName);
}
@Override