You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/04/19 12:29:12 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-597] Adjust
WebTopicCtrlHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 906ce46 [INLONG-597] Adjust WebTopicCtrlHandler class implementation
906ce46 is described below
commit 906ce469a423f681c50450f51c75cbfb28948b85
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sat Apr 17 18:13:41 2021 +0800
[INLONG-597] Adjust WebTopicCtrlHandler class implementation
---
.../tubemq/server/common/fielddef/WebFieldDef.java | 7 +-
.../server/common/utils/WebParameterUtils.java | 102 ++---
.../server/master/metamanage/MetaDataManager.java | 388 +++++++++---------
.../metastore/BdbMetaStoreServiceImpl.java | 66 ++-
.../metamanage/metastore/MetaStoreService.java | 45 ++-
.../metastore/dao/entity/BaseEntity.java | 57 ++-
.../metastore/dao/entity/BrokerConfEntity.java | 18 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 71 ++--
.../metastore/dao/entity/GroupBlackListEntity.java | 10 +-
.../dao/entity/GroupConsumeCtrlEntity.java | 59 +--
.../metastore/dao/entity/GroupResCtrlEntity.java | 79 ++--
.../metastore/dao/entity/TopicCtrlEntity.java | 37 +-
.../metastore/dao/entity/TopicDeployEntity.java | 28 +-
.../metastore/dao/mapper/BrokerConfigMapper.java | 2 +
.../dao/mapper/GroupConsumeCtrlMapper.java | 2 +
.../metastore/dao/mapper/GroupResCtrlMapper.java | 7 +-
.../metastore/dao/mapper/TopicCtrlMapper.java | 6 +
.../metastore/dao/mapper/TopicDeployMapper.java | 2 +
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 39 ++
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 7 +
.../impl/bdbimpl/BdbGroupResCtrlMapperImpl.java | 25 +-
.../impl/bdbimpl/BdbTopicCtrlMapperImpl.java | 26 ++
.../impl/bdbimpl/BdbTopicDeployMapperImpl.java | 6 +
.../master/web/handler/WebBrokerConfHandler.java | 104 ++---
.../web/handler/WebGroupConsumeCtrlHandler.java | 79 ++--
.../master/web/handler/WebGroupResCtrlHandler.java | 42 +-
.../master/web/handler/WebMasterInfoHandler.java | 18 +-
.../master/web/handler/WebTopicCtrlHandler.java | 441 +++++++++++++++++++++
...ConfHandler.java => WebTopicDeployHandler.java} | 115 ++----
29 files changed, 1249 insertions(+), 639 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index dafd8b1..1b088e1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -219,7 +219,12 @@ public enum WebFieldDef {
ISRESERVEDDATA(77, "isReservedData", "isRsvDt",
WebFieldType.BOOLEAN, "Whether to keep topic data in the broker"),
WITHCTRLINFO(78, "ctrlData", "cD",
- WebFieldType.BOOLEAN, "With topic control data info.");
+ WebFieldType.BOOLEAN, "With topic control data info."),
+ WITHDEPLOYINFO(79, "withDeployInfo", "wDI",
+ WebFieldType.BOOLEAN, "With topic deploy info."),
+
+ TOPICCTRLSET(80, "topicCtrlJsonSet", "tCtrlSet", WebFieldType.JSONSET,
+ "The topic control info set that needs to be added or modified");
public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 7f20a58..657fd69 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -38,7 +38,6 @@ import org.apache.tubemq.corebase.policies.FlowCtrlItem;
import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
@@ -121,63 +120,50 @@ public class WebParameterUtils {
return strBuffer;
}
- /**
- * Parse the parameter value required for add, update, delete record
- *
- * @param req Http Servlet Request
- * @param isAdd if add commend
- * @param result process result of parameter value, include a
- * tuple3 object(dataVersionId, operator, opData) info
- * @return process result
- */
public static boolean getAUDBaseInfo(HttpServletRequest req,
- boolean isAdd,
- ProcessResult result) {
+ boolean isAdd, ProcessResult result) {
// check and get data version id
if (!WebParameterUtils.getLongParamValue(req, WebFieldDef.DATAVERSIONID,
- false, TServerConstants.DEFAULT_DATA_VERSION, result)) {
+ false, TBaseConstants.META_VALUE_UNDEFINED, result)) {
return result.isSuccess();
}
long dataVerId = (long) result.retData1;
// check and get createUser or modifyUser
- String operator = null;
- Date opDate = null;
+ String createUsr = "";
+ Date createDate = null;
if (isAdd) {
// check create user field
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.CREATEUSER, true, null, result)) {
+ WebFieldDef.CREATEUSER, isAdd, null, result)) {
return result.isSuccess();
}
- operator = (String) result.retData1;
+ createUsr = (String) result.retData1;
// check and get create date
if (!WebParameterUtils.getDateParameter(req,
WebFieldDef.CREATEDATE, false, new Date(), result)) {
return result.isSuccess();
}
- opDate = (Date) result.retData1;
-
- } else {
- // check modify user field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.MODIFYUSER, true, null, result)) {
- return result.isSuccess();
- }
- operator = (String) result.retData1;
- // check and get modify date
- if (!WebParameterUtils.getDateParameter(req,
- WebFieldDef.MODIFYDATE, false, new Date(), result)) {
- return result.isSuccess();
- }
- opDate = (Date) result.retData1;
+ createDate = (Date) result.retData1;
+ }
+ // check modify user field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, !isAdd, createUsr, result)) {
+ return result.isSuccess();
}
- result.setSuccResult(new Tuple3<Long, String, Date>(
- dataVerId, operator, opDate));
+ String modifyUser = (String) result.retData1;
+ // check and get modify date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.MODIFYDATE, false, createDate, result)) {
+ return result.isSuccess();
+ }
+ Date modifyDate = (Date) result.retData1;
+ result.setSuccResult(new BaseEntity(dataVerId,
+ createUsr, createDate, modifyUser, modifyDate));
return result.isSuccess();
}
- public static boolean getAUDBaseInfo(Map<String, String> keyValueMap,
- boolean isAdd,
- ProcessResult result) {
+ public static boolean getAUDBaseInfo(Map<String, String> keyValueMap, boolean isAdd,
+ BaseEntity defOpInfoEntity, ProcessResult result) {
// check and get data version id
if (!WebParameterUtils.getLongParamValue(keyValueMap, WebFieldDef.DATAVERSIONID,
false, TBaseConstants.META_VALUE_UNDEFINED, result)) {
@@ -185,38 +171,36 @@ public class WebParameterUtils {
}
long dataVerId = (long) result.retData1;
// check and get createUser or modifyUser
- String operator = null;
- Date opDate = null;
+ String createUsr = null;
+ Date createDate = null;
if (isAdd) {
// check create user field
if (!WebParameterUtils.getStringParamValue(keyValueMap,
- WebFieldDef.CREATEUSER, false, null, result)) {
+ WebFieldDef.CREATEUSER, false, defOpInfoEntity.getCreateUser(), result)) {
return result.isSuccess();
}
- operator = (String) result.retData1;
+ createUsr = (String) result.retData1;
// check and get create date
if (!WebParameterUtils.getDateParameter(keyValueMap,
- WebFieldDef.CREATEDATE, false, null, result)) {
- return result.isSuccess();
- }
- opDate = (Date) result.retData1;
-
- } else {
- // check modify user field
- if (!WebParameterUtils.getStringParamValue(keyValueMap,
- WebFieldDef.MODIFYUSER, false, null, result)) {
- return result.isSuccess();
- }
- operator = (String) result.retData1;
- // check and get modify date
- if (!WebParameterUtils.getDateParameter(keyValueMap,
- WebFieldDef.MODIFYDATE, false, null, result)) {
+ WebFieldDef.CREATEDATE, false, defOpInfoEntity.getCreateDate(), result)) {
return result.isSuccess();
}
- opDate = (Date) result.retData1;
+ createDate = (Date) result.retData1;
+ }
+ // check modify user field
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.MODIFYUSER, false, defOpInfoEntity.getModifyUser(), result)) {
+ return result.isSuccess();
+ }
+ String modifyUser = (String) result.retData1;
+ // check and get modify date
+ if (!WebParameterUtils.getDateParameter(keyValueMap,
+ WebFieldDef.MODIFYDATE, false, defOpInfoEntity.getModifyDate(), result)) {
+ return result.isSuccess();
}
- result.setSuccResult(new Tuple3<Long, String, Date>(
- dataVerId, operator, opDate));
+ Date modifyDate = (Date) result.retData1;
+ result.setSuccResult(new BaseEntity(dataVerId,
+ createUsr, createDate, modifyUser, modifyDate));
return result.isSuccess();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index 9cdd0d3..89425a0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -19,7 +19,6 @@ package org.apache.tubemq.server.master.metamanage;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -49,6 +48,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
import org.apache.tubemq.server.master.metamanage.metastore.BdbMetaStoreServiceImpl;
import org.apache.tubemq.server.master.metamanage.metastore.MetaStoreService;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
@@ -72,6 +72,8 @@ public class MetaDataManager implements Server {
private static final Logger logger =
LoggerFactory.getLogger(MetaDataManager.class);
+ private static final ClusterSettingEntity defClusterSetting =
+ new ClusterSettingEntity().fillDefaultValue();
private final MasterReplicationConfig replicationConfig;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentHashMap<Integer, String> brokersMap =
@@ -420,16 +422,18 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public BrokerProcessResult addBrokerConfig(long dataVerId, String createUser, Date createDate,
- int brokerId, String brokerIp, int brokerPort,
- int brokerTlsPort, int brokerWebPort, int regionId,
- int groupId, ManageStatus manageStatus,
- TopicPropGroup topicProps, StringBuilder sBuilder,
+ public BrokerProcessResult addBrokerConfig(BaseEntity opInfoEntity, int brokerId,
+ String brokerIp, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int regionId, int groupId,
+ ManageStatus manageStatus,
+ TopicPropGroup topicProps,
+ StringBuilder sBuilder,
ProcessResult result) {
- BrokerConfEntity entity = new BrokerConfEntity(dataVerId, createUser, createDate);
+ BrokerConfEntity entity = new BrokerConfEntity(opInfoEntity);
entity.setBrokerIdAndIp(brokerId, brokerIp);
- entity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
- regionId, groupId, manageStatus, topicProps);
+ entity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort, brokerTlsPort,
+ brokerWebPort, regionId, groupId, manageStatus, topicProps);
return addBrokerConfig(entity, sBuilder, result);
}
@@ -883,8 +887,8 @@ public class MetaDataManager implements Server {
// ////////////////////////////////////////////////////////////////////////////
- public List<TopicProcessResult> addTopicDeployInfo(long dataVerId, String createUsr,
- Date createDate, Set<Integer> brokerIdSet,
+ public List<TopicProcessResult> addTopicDeployInfo(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
Set<String> topicNameSet,
TopicPropGroup topicPropInfo,
StringBuilder sBuilder,
@@ -892,7 +896,8 @@ public class MetaDataManager implements Server {
TopicDeployEntity deployConf;
List<TopicProcessResult> retInfo = new ArrayList<>();
// add topic control info
- addIfAbsentTopicCtrlConf(topicNameSet, createUsr, sBuilder, result);
+ addIfAbsentTopicCtrlConf(topicNameSet,
+ opEntity.getCreateUser(), sBuilder, result);
result.clear();
// add topic deployment record
for (Integer brokerId : brokerIdSet) {
@@ -903,6 +908,7 @@ public class MetaDataManager implements Server {
retInfo.add(new TopicProcessResult(brokerId, "", result));
continue;
}
+
for (String topicName : topicNameSet) {
TopicDeployEntity deployInfo = getTopicConfInfo(brokerId, topicName);
if (deployInfo != null) {
@@ -911,10 +917,11 @@ public class MetaDataManager implements Server {
retInfo.add(new TopicProcessResult(brokerId, topicName, result));
continue;
}
- deployConf = new TopicDeployEntity(dataVerId, createUsr, createDate);
- deployConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+ deployConf = new TopicDeployEntity(opEntity, brokerConf.getBrokerId(),
brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
- deployConf.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ deployConf.setTopicProps(brokerConf.getTopicProps());
+ deployConf.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED, null,
TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
metaStoreService.addTopicConf(deployConf, sBuilder, result);
@@ -927,16 +934,16 @@ public class MetaDataManager implements Server {
public TopicProcessResult addTopicDeployInfo(TopicDeployEntity deployEntity,
StringBuilder sBuilder,
ProcessResult result) {
- // add topic control info
- addIfAbsentTopicCtrlConf(deployEntity.getTopicName(),
- TBaseConstants.META_VALUE_UNDEFINED,
- deployEntity.getCreateUser(), sBuilder, result);
- BrokerConfEntity brokerConf = getBrokerConfByBrokerId(deployEntity.getBrokerId());
+ // check broker configure exist
+ BrokerConfEntity brokerConf =
+ getBrokerConfByBrokerId(deployEntity.getBrokerId());
if (brokerConf == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
DataOpErrCode.DERR_NOT_EXIST.getDescription());
return new TopicProcessResult(deployEntity.getBrokerId(), "", result);
}
+ // add topic control info
+ addIfAbsentTopicCtrlConf(deployEntity, sBuilder, result);
// add topic deployment record
TopicDeployEntity curDeployInfo =
metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
@@ -957,15 +964,16 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public List<TopicProcessResult> modTopicConfig(long dataVerId, String modifyUser,
- Date modifyDate, Set<Integer> brokerIdSet,
+ public List<TopicProcessResult> modTopicConfig(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
Set<String> topicNameSet,
TopicPropGroup topicProps,
StringBuilder sBuilder,
ProcessResult result) {
List<TopicProcessResult> retInfo = new ArrayList<>();
// add topic control info
- addIfAbsentTopicCtrlConf(topicNameSet, modifyUser, sBuilder, result);
+ addIfAbsentTopicCtrlConf(topicNameSet,
+ opEntity.getModifyUser(), sBuilder, result);
result.clear();
// add topic deployment record
TopicDeployEntity curEntity;
@@ -1029,10 +1037,11 @@ public class MetaDataManager implements Server {
}
}
newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId,
- null, null, modifyUser, modifyDate, null);
- if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED, null, null, topicProps)) {
+ newEntity.updBaseModifyInfo(opEntity);
+ if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED,
+ null, null, topicProps)) {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
sBuilder.append("Data not changed for brokerId=")
.append(curEntity.getBrokerId()).append(", topicName=")
@@ -1054,8 +1063,8 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public List<TopicProcessResult> modDelOrRmvTopicConf(long dataVerId, String modifyUser,
- Date modifyDate, Set<Integer> brokerIdSet,
+ public List<TopicProcessResult> modDelOrRmvTopicConf(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
Set<String> topicNameSet,
TopicStatus topicStatus,
StringBuilder sBuilder,
@@ -1098,10 +1107,11 @@ public class MetaDataManager implements Server {
continue;
}
newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId,
- null, null, modifyUser, modifyDate, null);
- if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, null)) {
+ newEntity.updBaseModifyInfo(opEntity);
+ if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED,
+ null, topicStatus, null)) {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
sBuilder.append("Data not changed for brokerId=")
.append(curEntity.getBrokerId()).append(", topicName=")
@@ -1123,8 +1133,8 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public List<TopicProcessResult> modRedoDelTopicConf(long dataVerId, String modifyUser,
- Date modifyDate, Set<Integer> brokerIdSet,
+ public List<TopicProcessResult> modRedoDelTopicConf(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
Set<String> topicNameSet,
StringBuilder sBuilder,
ProcessResult result) {
@@ -1173,9 +1183,9 @@ public class MetaDataManager implements Server {
continue;
}
newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId,
- null, null, modifyUser, modifyDate, null);
- if (!newEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ newEntity.updBaseModifyInfo(opEntity);
+ if (!newEntity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED, null,
TopicStatus.STATUS_TOPIC_OK, null)) {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
@@ -1346,36 +1356,125 @@ public class MetaDataManager implements Server {
}
// /////////////////////////////////////////////////////////////////////////////////
+
/**
- * Add topic control configure info
+ * Add or Update topic control configure info
*
- * @param entity the topic control info entity will be add
- * @param strBuffer the print info string buffer
+ * @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean confAddTopicCtrlConf(TopicCtrlEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- metaStoreService.addTopicCtrlConf(entity, strBuffer, result);
+ public List<TopicProcessResult> addOrUpdTopicCtrlConf(BaseEntity opInfoEntity,
+ Set<String> topicNameSet,
+ int topicNameId,
+ Boolean enableTopicAuth,
+ int maxMsgSizeInMB,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ TopicCtrlEntity curEntity;
+ TopicCtrlEntity newEntity;
+ List<TopicProcessResult> retInfoList = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ curEntity = metaStoreService.getTopicCtrlConf(topicName);
+ if (curEntity == null) {
+ newEntity = new TopicCtrlEntity(opInfoEntity, topicName);
+ newEntity.updModifyInfo(opInfoEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableTopicAuth);
+ metaStoreService.addTopicCtrlConf(newEntity, sBuffer, result);
+ } else {
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(opInfoEntity);
+ if (!newEntity.updModifyInfo(opInfoEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableTopicAuth)) {
+ result.setSuccResult(null);
+ retInfoList.add(new TopicProcessResult(0, topicName, result));
+ continue;
+ }
+ metaStoreService.updTopicCtrlConf(newEntity, sBuffer, result);
+ }
+ retInfoList.add(new TopicProcessResult(0, topicName, result));
+ }
+ return retInfoList;
+ }
+
+ /**
+ * Add or Update topic control configure info
+ *
+ * @param inEntity the topic control info entity will be add
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public TopicProcessResult addOrUpdTopicCtrlConf(TopicCtrlEntity inEntity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+
+ TopicCtrlEntity curEntity =
+ metaStoreService.getTopicCtrlConf(inEntity.getTopicName());
+ if (curEntity == null) {
+ metaStoreService.addTopicCtrlConf(inEntity, sBuffer, result);
+ } else {
+ TopicCtrlEntity newEntity2 = curEntity.clone();
+ newEntity2.updBaseModifyInfo(inEntity);
+ if (!newEntity2.updModifyInfo(inEntity.getDataVerId(), inEntity.getTopicId(),
+ inEntity.getMaxMsgSizeInMB(), inEntity.isAuthCtrlEnable())) {
+ result.setSuccResult(null);
+ return new TopicProcessResult(0, inEntity.getTopicName(), result);
+ }
+ metaStoreService.updTopicCtrlConf(newEntity2, sBuffer, result);
+ }
+ return new TopicProcessResult(0, inEntity.getTopicName(), result);
+ }
+
+ /**
+ * Delete topic control configure
+ *
+ * @param operator operator
+ * @param topicName the topicName will be deleted
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public boolean delTopicCtrlConf(String operator,
+ String topicName,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check current status
+ if (!metaStoreService.checkStoreStatus(true, result)) {
+ return result.isSuccess();
+ }
+ if (metaStoreService.isTopicDeployed(topicName)) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("TopicName ").append(topicName)
+ .append(" is in using, please delete the deploy info first!")
+ .toString());
+ return result.isSuccess();
+ }
+ if (metaStoreService.isTopicNameInUsed(topicName)) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("TopicName ").append(topicName)
+ .append(" is in using, please delete the consume control info first!")
+ .toString());
+ return result.isSuccess();
+ }
+ metaStoreService.delTopicCtrlConf(operator, topicName, sBuffer, result);
return result.isSuccess();
}
+
/**
* Add if absent topic control configure info
*
- * @param topicName the topic name will be add
- * @param topicNameId the topic name id will be add
- * @param operator operator
- * @param strBuffer the print info string buffer
+ * @param deployEntity the topic deploy info will be add
+ * @param strBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
*/
- public void addIfAbsentTopicCtrlConf(String topicName,
- int topicNameId,
- String operator,
+ public void addIfAbsentTopicCtrlConf(TopicDeployEntity deployEntity,
StringBuilder strBuffer,
ProcessResult result) {
TopicCtrlEntity curEntity =
- metaStoreService.getTopicCtrlConf(topicName);
+ metaStoreService.getTopicCtrlConf(deployEntity.getTopicName());
if (curEntity != null) {
return;
}
@@ -1384,7 +1483,8 @@ public class MetaDataManager implements Server {
if (defSetting != null) {
maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
}
- curEntity = new TopicCtrlEntity(topicName, topicNameId, maxMsgSizeInMB, operator);
+ curEntity = new TopicCtrlEntity(deployEntity.getTopicName(),
+ deployEntity.getTopicId(), maxMsgSizeInMB, deployEntity.getCreateUser());
metaStoreService.addTopicCtrlConf(curEntity, strBuffer, result);
return;
}
@@ -1397,10 +1497,8 @@ public class MetaDataManager implements Server {
* @param operator operator
* @param strBuffer the print info string buffer
*/
- public void addIfAbsentTopicCtrlConf(Set<String> topicNameSet,
- String operator,
- StringBuilder strBuffer,
- ProcessResult result) {
+ public void addIfAbsentTopicCtrlConf(Set<String> topicNameSet, String operator,
+ StringBuilder strBuffer, ProcessResult result) {
TopicCtrlEntity curEntity;
int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
@@ -1408,7 +1506,6 @@ public class MetaDataManager implements Server {
maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
}
for (String topicName : topicNameSet) {
- result.clear();
curEntity = metaStoreService.getTopicCtrlConf(topicName);
if (curEntity != null) {
continue;
@@ -1419,67 +1516,6 @@ public class MetaDataManager implements Server {
}
}
- /**
- * Update topic control configure
- *
- * @param entity the topic control info entity will be update
- * @param strBuffer the print info string buffer
- * @param result the process result return
- * @return true if success otherwise false
- */
- public boolean confUpdTopicCtrlConf(TopicCtrlEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.updTopicCtrlConf(entity, result)) {
- TopicCtrlEntity oldEntity =
- (TopicCtrlEntity) result.getRetData();
- TopicCtrlEntity curEntity =
- metaStoreService.getTopicCtrlConf(entity.getTopicName());
- strBuffer.append("[confUpdTopicCtrlConf], ")
- .append(entity.getModifyUser())
- .append(" updated record from :").append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confUpdTopicCtrlConf], ")
- .append("failure to update topic control record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
- }
-
- /**
- * Delete topic control configure
- *
- * @param operator operator
- * @param topicName the topicName will be deleted
- * @param strBuffer the print info string buffer
- * @param result the process result return
- * @return true if success otherwise false
- */
- public boolean confDelTopicCtrlConf(String operator,
- String topicName,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.delTopicCtrlConf(topicName, result)) {
- TopicCtrlEntity entity =
- (TopicCtrlEntity) result.getRetData();
- if (entity != null) {
- strBuffer.append("[confDelTopicCtrlConf], ").append(operator)
- .append(" deleted topic control record :").append(entity.toString());
- logger.info(strBuffer.toString());
- }
- } else {
- strBuffer.append("[confDelTopicCtrlConf], ")
- .append("failure to delete topic control record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
- }
public TopicCtrlEntity getTopicCtrlByTopicName(String topicName) {
return this.metaStoreService.getTopicCtrlConf(topicName);
@@ -1487,11 +1523,8 @@ public class MetaDataManager implements Server {
public int getTopicMaxMsgSizeInMB(String topicName) {
// get maxMsgSizeInMB info
- int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
- ClusterSettingEntity clusterSettingEntity = getClusterDefSetting();
- if (clusterSettingEntity != null) {
- maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
- }
+ ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false);
+ int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
TopicCtrlEntity topicCtrlEntity = getTopicCtrlByTopicName(topicName);
if (topicCtrlEntity != null) {
maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
@@ -1509,32 +1542,26 @@ public class MetaDataManager implements Server {
return metaStoreService.getTopicCtrlConf(qryEntity);
}
+ public Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity) {
+ return metaStoreService.getTopicCtrlConf(topicNameSet, qryEntity);
+ }
- // //////////////////////////////////////////////////////////////////////////////
- public boolean addClusterDefSetting(StringBuilder strBuffer, ProcessResult result) {
- if (metaStoreService.getClusterConfig() == null) {
- ClusterSettingEntity defConf =
- new ClusterSettingEntity(TServerConstants.DEFAULT_DATA_VERSION,
- "Stystem", new Date());
- defConf.fillDefaultValue();
- return metaStoreService.addClusterConfig(defConf, strBuffer, result);
- }
- result.setSuccResult(null);
- return result.isSuccess();
- }
- public boolean addClusterDefSetting(long dataVerId, String createUsr, Date createDate,
- int brokerPort, int brokerTlsPort, int brokerWebPort,
+ // //////////////////////////////////////////////////////////////////////////////
+
+ public boolean addClusterDefSetting(BaseEntity opEntity, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
int maxMsgSizeMB, int qryPriorityId,
Boolean flowCtrlEnable, int flowRuleCnt,
String flowCtrlInfo, TopicPropGroup topicProps,
StringBuilder strBuffer, ProcessResult result) {
ClusterSettingEntity newConf =
- new ClusterSettingEntity(dataVerId, createUsr, createDate);
+ new ClusterSettingEntity(opEntity);
newConf.fillDefaultValue();
- newConf.updModifyInfo(brokerPort, brokerTlsPort,
- brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
return metaStoreService.addClusterConfig(newConf, strBuffer, result);
}
@@ -1544,8 +1571,8 @@ public class MetaDataManager implements Server {
*
* @return true if success otherwise false
*/
- public boolean modClusterDefSetting(long dataVerId, String modifyUser, Date modifyDate,
- int brokerPort, int brokerTlsPort, int brokerWebPort,
+ public boolean modClusterDefSetting(BaseEntity opEntity, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
int maxMsgSizeMB, int qryPriorityId,
Boolean flowCtrlEnable, int flowRuleCnt,
String flowCtrlInfo, TopicPropGroup topicProps,
@@ -1558,9 +1585,9 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
ClusterSettingEntity newConf = curConf.clone();
- newConf.setModifyInfo(dataVerId, modifyUser, modifyDate);
- if (newConf.updModifyInfo(brokerPort, brokerTlsPort,
- brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ newConf.updBaseModifyInfo(opEntity);
+ if (newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
metaStoreService.updClusterConfig(newConf, strBuffer, result);
} else {
@@ -1571,25 +1598,28 @@ public class MetaDataManager implements Server {
}
- public ClusterSettingEntity getClusterDefSetting() {
- return metaStoreService.getClusterConfig();
+ public ClusterSettingEntity getClusterDefSetting(boolean isMustConf) {
+ ClusterSettingEntity curClsSetting =
+ metaStoreService.getClusterConfig();
+ if (!isMustConf && curClsSetting == null) {
+ curClsSetting = defClusterSetting;
+ }
+ return curClsSetting;
}
// //////////////////////////////////////////////////////////////////////////////
- public GroupProcessResult addGroupResCtrlConf(long dataVerId, String createUser,
- Date createDate, String groupName,
+ public GroupProcessResult addGroupResCtrlConf(BaseEntity opEntity, String groupName,
Boolean consumeEnable, String disableRsn,
Boolean resCheckEnable, int allowedBClientRate,
int qryPriorityId, Boolean flowCtrlEnable,
int flowRuleCnt, String flowCtrlInfo,
StringBuilder sBuilder, ProcessResult result) {
GroupResCtrlEntity entity =
- new GroupResCtrlEntity(dataVerId, createUser, createDate);
- entity.setGroupName(groupName);
- entity.updModifyInfo(consumeEnable, disableRsn,
- resCheckEnable, allowedBClientRate, qryPriorityId,
- flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
+ new GroupResCtrlEntity(opEntity, groupName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
return addGroupResCtrlConf(entity, sBuilder, result);
}
@@ -1618,8 +1648,7 @@ public class MetaDataManager implements Server {
*
* @return true if success otherwise false
*/
- public GroupProcessResult updGroupResCtrlConf(long dataVerId, String modifyUser,
- Date modifyDate, String groupName,
+ public GroupProcessResult updGroupResCtrlConf(BaseEntity opEntity, String groupName,
Boolean consumeEnable, String disableRsn,
Boolean resCheckEnable, int allowedBClientRate,
int qryPriorityId, Boolean flowCtrlEnable,
@@ -1633,10 +1662,10 @@ public class MetaDataManager implements Server {
return new GroupProcessResult(groupName, null, result);
}
GroupResCtrlEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId,
- null, null, modifyUser, modifyDate, null);
- if (newEntity.updModifyInfo(consumeEnable, disableRsn, resCheckEnable,
- allowedBClientRate, qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo)) {
+ newEntity.updBaseModifyInfo(opEntity);
+ if (newEntity.updModifyInfo(opEntity.getDataVerId(), consumeEnable, disableRsn,
+ resCheckEnable, allowedBClientRate, qryPriorityId, flowCtrlEnable,
+ flowRuleCnt, flowCtrlInfo)) {
metaStoreService.updGroupResCtrlConf(newEntity, sBuilder, result);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
@@ -1671,33 +1700,31 @@ public class MetaDataManager implements Server {
return retInfo;
}
- public Map<String, GroupResCtrlEntity> confGetGroupResCtrlConf(
- GroupResCtrlEntity qryEntity) {
- return metaStoreService.getGroupResCtrlConf(qryEntity);
+ public Map<String, GroupResCtrlEntity> confGetGroupResCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity) {
+ return metaStoreService.getGroupResCtrlConf(groupSet, qryEntity);
}
public GroupResCtrlEntity confGetGroupResCtrlConf(String groupName) {
return this.metaStoreService.getGroupResCtrlConf(groupName);
}
- public GroupProcessResult addGroupConsumeCtrlInfo(long dataVerId, String createUser,
- Date createDate, String groupName,
+ public GroupProcessResult addGroupConsumeCtrlInfo(BaseEntity opInfoEntity, String groupName,
String topicName, Boolean enableCsm,
String disableRsn, Boolean enableFilter,
String filterCondStr, StringBuilder sBuilder,
ProcessResult result) {
GroupConsumeCtrlEntity entity =
- new GroupConsumeCtrlEntity(dataVerId, createUser, createDate);
- entity.setGroupAndTopic(groupName, topicName);
- entity.updModifyInfo(enableCsm, disableRsn, enableFilter, filterCondStr);
+ new GroupConsumeCtrlEntity(opInfoEntity, groupName, topicName);
+ entity.updModifyInfo(opInfoEntity.getDataVerId(),
+ enableCsm, disableRsn, enableFilter, filterCondStr);
return addGroupConsumeCtrlInfo(entity, sBuilder, result);
}
public GroupProcessResult addGroupConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
StringBuilder sBuilder,
ProcessResult result) {
- if (!addIfAbsentGroupResConf(entity.getGroupName(),
- entity.getCreateUser(), entity.getCreateDate(), sBuilder, result)) {
+ if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuilder, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
if (metaStoreService.getConsumeCtrlByGroupAndTopic(
@@ -1710,8 +1737,7 @@ public class MetaDataManager implements Server {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- public GroupProcessResult modGroupConsumeCtrlInfo(long dataVerId, String modifyUser,
- Date modifyDate, String groupName,
+ public GroupProcessResult modGroupConsumeCtrlInfo(BaseEntity opEntity, String groupName,
String topicName, Boolean enableCsm,
String disableRsn, Boolean enableFilter,
String filterCondStr, StringBuilder sBuilder,
@@ -1724,9 +1750,9 @@ public class MetaDataManager implements Server {
return new GroupProcessResult(groupName, topicName, result);
}
GroupConsumeCtrlEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId,
- null, null, modifyUser, modifyDate, null);
- if (newEntity.updModifyInfo(enableCsm, disableRsn, enableFilter, filterCondStr)) {
+ newEntity.updBaseModifyInfo(opEntity);
+ if (newEntity.updModifyInfo(opEntity.getDataVerId(),
+ enableCsm, disableRsn, enableFilter, filterCondStr)) {
metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuilder, result);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
@@ -1746,8 +1772,7 @@ public class MetaDataManager implements Server {
DataOpErrCode.DERR_NOT_EXIST.getDescription());
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- if (!addIfAbsentGroupResConf(entity.getGroupName(),
- entity.getModifyUser(), entity.getModifyDate(), sBuilder, result)) {
+ if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuilder, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
metaStoreService.updGroupConsumeCtrlConf(entity, sBuilder, result);
@@ -1784,7 +1809,7 @@ public class MetaDataManager implements Server {
return retInfo;
}
- private boolean addIfAbsentGroupResConf(String groupName, String operator, Date opDate,
+ private boolean addIfAbsentGroupResConf(BaseEntity opEntity, String groupName,
StringBuilder sBuilder, ProcessResult result) {
GroupResCtrlEntity resCtrlEntity =
this.metaStoreService.getGroupResCtrlConf(groupName);
@@ -1792,8 +1817,7 @@ public class MetaDataManager implements Server {
result.setSuccResult(null);
return true;
}
- resCtrlEntity = new GroupResCtrlEntity(
- TServerConstants.DEFAULT_DATA_VERSION, operator, opDate);
+ resCtrlEntity = new GroupResCtrlEntity(opEntity, groupName);
resCtrlEntity.setGroupName(groupName);
resCtrlEntity.fillDefaultValue();
return this.metaStoreService.addGroupResCtrlConf(resCtrlEntity, sBuilder, result);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 2aa4a57..7551649 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -524,6 +524,11 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public boolean isTopicDeployed(String topicName) {
+ return topicDeployMapper.isTopicDeployed(topicName);
+ }
+
+ @Override
public TopicDeployEntity getTopicConfByeRecKey(String recordKey) {
return topicDeployMapper.getTopicConfByeRecKey(recordKey);
}
@@ -603,21 +608,58 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result) {
+ public boolean updTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicCtrlMapper.updTopicCtrlConf(entity, result);
+ if (topicCtrlMapper.updTopicCtrlConf(entity, result)) {
+ TopicCtrlEntity oldEntity =
+ (TopicCtrlEntity) result.getRetData();
+ TopicCtrlEntity curEntity =
+ topicCtrlMapper.getTopicCtrlConf(entity.getTopicName());
+ sBuffer.append("[updTopicCtrlConf], ")
+ .append(entity.getModifyUser())
+ .append(" updated record from :").append(oldEntity.toString())
+ .append(" to ").append(curEntity.toString());
+ logger.info(sBuffer.toString());
+ } else {
+ sBuffer.append("[updTopicCtrlConf], ")
+ .append("failure to update topic control record : ")
+ .append(result.getErrInfo());
+ logger.warn(sBuffer.toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
@Override
- public boolean delTopicCtrlConf(String topicName, ProcessResult result) {
+ public boolean delTopicCtrlConf(String operator,
+ String topicName,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return topicCtrlMapper.delTopicCtrlConf(topicName, result);
+ if (topicCtrlMapper.delTopicCtrlConf(topicName, result)) {
+ TopicCtrlEntity entity =
+ (TopicCtrlEntity) result.getRetData();
+ if (entity != null) {
+ sBuffer.append("[delTopicCtrlConf], ").append(operator)
+ .append(" deleted topic control record :").append(entity.toString());
+ logger.info(sBuffer.toString());
+ }
+ } else {
+ sBuffer.append("[delTopicCtrlConf], ")
+ .append("failure to delete topic control record : ")
+ .append(result.getErrInfo());
+ logger.warn(sBuffer.toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
@Override
@@ -630,6 +672,12 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
return topicCtrlMapper.getTopicCtrlConf(qryEntity);
}
+ @Override
+ public Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity) {
+ return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity);
+ }
+
// group configure api
@Override
public boolean addGroupResCtrlConf(GroupResCtrlEntity entity,
@@ -718,8 +766,9 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(GroupResCtrlEntity qryEntity) {
- return groupResCtrlMapper.getGroupResCtrlConf(qryEntity);
+ public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity) {
+ return groupResCtrlMapper.getGroupResCtrlConf(groupSet, qryEntity);
}
// group blacklist api
@@ -892,6 +941,11 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public boolean isTopicNameInUsed(String topicName) {
+ return groupConsumeCtrlMapper.isTopicNameInUsed(topicName);
+ }
+
+ @Override
public GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey) {
return groupConsumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(recordKey);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 362baf1..7025e8c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -149,6 +149,8 @@ public interface MetaStoreService extends KeepAlive, Server {
boolean hasConfiguredTopics(int brokerId);
+ boolean isTopicDeployed(String topicName);
+
TopicDeployEntity getTopicConfByeRecKey(String recordKey);
List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
@@ -173,18 +175,52 @@ public interface MetaStoreService extends KeepAlive, Server {
Map<String, TopicDeployEntity> getConfiguredTopicInfo(int brokerId);
// topic control api
+ /**
+ * Add topic control configure info
+ *
+ * @param entity the topic control info entity will be add
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
boolean addTopicCtrlConf(TopicCtrlEntity entity,
StringBuilder sBuffer,
ProcessResult result);
- boolean updTopicCtrlConf(TopicCtrlEntity entity, ProcessResult result);
+ /**
+ * Update topic control configure
+ *
+ * @param entity the topic control info entity will be update
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean updTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result);
- boolean delTopicCtrlConf(String topicName, ProcessResult result);
+ /**
+ * Delete topic control configure
+ *
+ * @param operator operator
+ * @param topicName the topicName will be deleted
+ * @param strBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean delTopicCtrlConf(String operator,
+ String topicName,
+ StringBuilder sBuffer,
+ ProcessResult result);
TopicCtrlEntity getTopicCtrlConf(String topicName);
List<TopicCtrlEntity> getTopicCtrlConf(TopicCtrlEntity qryEntity);
+ Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity);
+
+
// group resource configure api
/**
* Add group resource control configure info
@@ -226,7 +262,8 @@ public interface MetaStoreService extends KeepAlive, Server {
GroupResCtrlEntity getGroupResCtrlConf(String groupName);
- Map<String, GroupResCtrlEntity> getGroupResCtrlConf(GroupResCtrlEntity qryEntity);
+ Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity);
// group blacklist api
boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
@@ -301,6 +338,8 @@ public interface MetaStoreService extends KeepAlive, Server {
StringBuilder strBuffer,
ProcessResult result);
+ boolean isTopicNameInUsed(String topicName);
+
GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index a7bbee4..5645174 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -54,6 +54,14 @@ public class BaseEntity implements Serializable, Cloneable {
createUser, createDate, createUser, createDate);
}
+ public BaseEntity(BaseEntity other) {
+ this.dataVersionId = other.dataVersionId;
+ this.createUser = other.createUser;
+ this.createDate = other.createDate;
+ this.modifyUser = other.modifyUser;
+ this.modifyDate = other.modifyDate;
+ }
+
public BaseEntity(long dataVersionId, String createUser, Date createDate) {
this(dataVersionId, createUser, createDate, createUser, createDate);
}
@@ -80,6 +88,37 @@ public class BaseEntity implements Serializable, Cloneable {
this.modifyDate = opDate;
}
+ public boolean updBaseModifyInfo(BaseEntity opInfoEntity) {
+ boolean changed = false;
+ if (TStringUtils.isNotBlank(opInfoEntity.getCreateUser())
+ && !Objects.equals(createUser, opInfoEntity.getCreateUser())) {
+ changed = true;
+ this.createUser = opInfoEntity.getCreateUser();
+ }
+ if (opInfoEntity.getCreateDate() != null
+ && !Objects.equals(createDate, opInfoEntity.getCreateDate())) {
+ changed = true;
+ this.createDate = opInfoEntity.getCreateDate();
+ }
+ if (TStringUtils.isNotBlank(opInfoEntity.getModifyUser())
+ && !Objects.equals(modifyUser, opInfoEntity.getModifyUser())) {
+ changed = true;
+ this.modifyUser = opInfoEntity.getModifyUser();
+ }
+ if (opInfoEntity.getModifyDate() != null
+ && !Objects.equals(modifyDate, opInfoEntity.getModifyDate())) {
+ changed = true;
+ this.modifyDate = opInfoEntity.getModifyDate();
+ }
+ if (TStringUtils.isNotBlank(opInfoEntity.getAttributes())
+ && !Objects.equals(attributes, opInfoEntity.getAttributes())) {
+ changed = true;
+ this.attributes = opInfoEntity.getAttributes();
+ }
+ return changed;
+ }
+
+
public boolean updBaseModifyInfo(long newDataVerId, String newCreateUser,
Date newCreateDate, String newModifyUser,
Date newModifyDate, String newAttributes) {
@@ -118,10 +157,6 @@ public class BaseEntity implements Serializable, Cloneable {
return changed;
}
- public void setDataVersionId() {
- setDataVersionId(System.currentTimeMillis());
- }
-
public void setDataVersionId(long dataVersionId) {
this.dataVersionId = dataVersionId;
}
@@ -151,7 +186,7 @@ public class BaseEntity implements Serializable, Cloneable {
return createDate;
}
- public long getDataVersionId() {
+ public long getDataVerId() {
return dataVersionId;
}
@@ -191,8 +226,8 @@ public class BaseEntity implements Serializable, Cloneable {
if (target == null) {
return true;
}
- if ((target.getDataVersionId() != TBaseConstants.META_VALUE_UNDEFINED
- && this.getDataVersionId() != target.getDataVersionId())
+ if ((target.getDataVerId() != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != target.getDataVerId())
|| (TStringUtils.isNotBlank(target.getCreateUser())
&& !target.getCreateUser().equals(createUser))
|| (TStringUtils.isNotBlank(target.getModifyUser())
@@ -257,7 +292,11 @@ public class BaseEntity implements Serializable, Cloneable {
}
@Override
- public Object clone() throws CloneNotSupportedException {
- return super.clone();
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (Throwable e) {
+ return null;
+ }
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 979333c..f79f07e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -60,8 +60,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
super();
}
- public BrokerConfEntity(long dataVerId, String createUser, Date createDate) {
- super(dataVerId, createUser, createDate);
+ public BrokerConfEntity(BaseEntity opInfoEntity) {
+ super(opInfoEntity);
}
public BrokerConfEntity(int brokerId, String brokerIp, int brokerPort,
@@ -113,7 +113,7 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
topicProps.isAcceptSubscribe(), getAttributes(), isConfDataUpdated,
isBrokerLoaded, getCreateUser(), getCreateDate(),
getModifyUser(), getModifyDate());
- bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setRegionId(regionId);
bdbEntity.setBrokerGroupId(groupId);
bdbEntity.setBrokerTLSPort(brokerTLSPort);
@@ -283,10 +283,16 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(int brokerPort, int brokerTlsPort, int brokerWebPort,
- int regionId, int groupId, ManageStatus manageStatus,
- TopicPropGroup topicProps) {
+ public boolean updModifyInfo(long dataVerId, int brokerPort, int brokerTlsPort,
+ int brokerWebPort, int regionId, int groupId,
+ ManageStatus manageStatus, TopicPropGroup topicProps) {
boolean changed = false;
+ // check and set dataVerId info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
+ changed = true;
+ this.setDataVersionId(dataVerId);
+ }
// check and set brokerPort info
if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED
&& this.brokerPort != brokerPort) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 7beb57e..483320e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -17,7 +17,6 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
-import java.util.Date;
import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.SettingValidUtils;
@@ -59,8 +58,8 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
super();
}
- public ClusterSettingEntity(long dataVerId, String createUser, Date createDate) {
- super(dataVerId, createUser, createDate);
+ public ClusterSettingEntity(BaseEntity opEntity) {
+ super(opEntity);
}
// Constructor by BdbClusterSettingEntity
@@ -89,7 +88,7 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
// build bdb object from current info
public BdbClusterSettingEntity buildBdbClsDefSettingEntity() {
BdbClusterSettingEntity bdbEntity =
- new BdbClusterSettingEntity(recordKey, getDataVersionId(), brokerPort,
+ new BdbClusterSettingEntity(recordKey, getDataVerId(), brokerPort,
brokerTLSPort, brokerWebPort, clsDefTopicProps.getNumTopicStores(),
clsDefTopicProps.getNumPartitions(), clsDefTopicProps.getUnflushThreshold(),
clsDefTopicProps.getUnflushInterval(), clsDefTopicProps.getUnflushDataHold(),
@@ -131,59 +130,65 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(int newBrokerPort, int newBrokerTLSPort,
- int newBrokerWebPort, int newMaxMsgSizeMB,
- int newQryPriorityId, Boolean newFlowCtrlEnable,
- int flowRuleCnt, String newFlowCtrlRuleInfo,
- TopicPropGroup newDefTopicProps) {
+ public boolean updModifyInfo(long dataVerId, int brokerPort, int brokerTLSPort,
+ int brokerWebPort, int maxMsgSizeMB,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlRuleInfo,
+ TopicPropGroup defTopicProps) {
boolean changed = false;
- if (newBrokerPort != TBaseConstants.META_VALUE_UNDEFINED
- && this.brokerPort != newBrokerPort) {
+ // check and set dataVerId info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
changed = true;
- this.brokerPort = newBrokerPort;
+ this.setDataVersionId(dataVerId);
}
- if (newBrokerTLSPort != TBaseConstants.META_VALUE_UNDEFINED
- && this.brokerTLSPort != newBrokerTLSPort) {
+ if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerPort != brokerPort) {
changed = true;
- this.brokerTLSPort = newBrokerTLSPort;
+ this.brokerPort = brokerPort;
}
- if (newBrokerWebPort != TBaseConstants.META_VALUE_UNDEFINED
- && this.brokerWebPort != newBrokerWebPort) {
+ if (brokerTLSPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerTLSPort != brokerTLSPort) {
changed = true;
- this.brokerWebPort = newBrokerWebPort;
+ this.brokerTLSPort = brokerTLSPort;
+ }
+ if (brokerWebPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerWebPort != brokerWebPort) {
+ changed = true;
+ this.brokerWebPort = brokerWebPort;
}
// check and set modified field
- if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
+ if (maxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
int newMaxMsgSizeB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(newMaxMsgSizeMB);
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeMB);
if (this.maxMsgSizeInB != newMaxMsgSizeB) {
changed = true;
this.maxMsgSizeInB = newMaxMsgSizeB;
- this.maxMsgSizeInMB = newMaxMsgSizeMB;
+ this.maxMsgSizeInMB = maxMsgSizeMB;
}
}
// check and set qry priority id
- if (newQryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
- && this.qryPriorityId != newQryPriorityId) {
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.qryPriorityId != qryPriorityId) {
changed = true;
- this.qryPriorityId = newQryPriorityId;
+ this.qryPriorityId = qryPriorityId;
}
// check and set flowCtrl info
- if (newFlowCtrlEnable != null
- && this.gloFlowCtrlStatus.isEnable() != newFlowCtrlEnable) {
+ if (flowCtrlEnable != null
+ && this.gloFlowCtrlStatus.isEnable() != flowCtrlEnable) {
changed = true;
- setEnableFlowCtrl(newFlowCtrlEnable);
+ setEnableFlowCtrl(flowCtrlEnable);
}
- if (TStringUtils.isNotBlank(newFlowCtrlRuleInfo)
- && !newFlowCtrlRuleInfo.equals(gloFlowCtrlRuleInfo)) {
+ if (TStringUtils.isNotBlank(flowCtrlRuleInfo)
+ && !flowCtrlRuleInfo.equals(gloFlowCtrlRuleInfo)) {
changed = true;
- setGloFlowCtrlInfo(flowRuleCnt, newFlowCtrlRuleInfo);
+ setGloFlowCtrlInfo(flowRuleCnt, flowCtrlRuleInfo);
}
// check and set clsDefTopicProps info
- if (newDefTopicProps != null
- && !newDefTopicProps.isDataEquals(clsDefTopicProps)) {
+ if (defTopicProps != null
+ && !defTopicProps.isDataEquals(clsDefTopicProps)) {
changed = true;
- clsDefTopicProps = newDefTopicProps;
+ clsDefTopicProps = defTopicProps;
}
if (changed) {
updSerialId();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
index b576cce..1ca7c11 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
@@ -59,7 +59,7 @@ public class GroupBlackListEntity extends BaseEntity implements Cloneable {
BdbBlackGroupEntity bdbEntity =
new BdbBlackGroupEntity(topicName, groupName,
getAttributes(), getCreateUser(), getCreateDate());
- bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setReason(reason);
return bdbEntity;
}
@@ -166,11 +166,7 @@ public class GroupBlackListEntity extends BaseEntity implements Cloneable {
@Override
public GroupBlackListEntity clone() {
- try {
- GroupBlackListEntity copy = (GroupBlackListEntity) super.clone();
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
- }
+ GroupBlackListEntity copy = (GroupBlackListEntity) super.clone();
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 6ad0eb8..b267530 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -20,6 +20,7 @@ package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
import java.util.Date;
import java.util.Objects;
+import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
@@ -46,10 +47,11 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
super();
}
- public GroupConsumeCtrlEntity(long dataVerId,
- String createUser,
- Date createDate) {
- super(dataVerId, createUser, createDate);
+ public GroupConsumeCtrlEntity(BaseEntity opInfoEntity,
+ String groupName, String topicName) {
+ super(opInfoEntity);
+ this.groupName = groupName;
+ this.topicName = topicName;
}
public GroupConsumeCtrlEntity(String groupName, String topicName,
@@ -88,7 +90,7 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
new BdbGroupFilterCondEntity(topicName, groupName,
filterEnable.getCode(), filterCondStr,
getAttributes(), getCreateUser(), getCreateDate());
- bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setConsumeEnable(consumeEnable);
bdbEntity.setDisableConsumeReason(disableReason);
return bdbEntity;
@@ -169,32 +171,39 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(Boolean newConsumeEnable, String newDisableRsn,
- Boolean newFilterEnable, String newFilterCondStr) {
+ public boolean updModifyInfo(long dataVerId, Boolean consumeEnable,
+ String disableRsn, Boolean filterEnable,
+ String filterCondStr) {
boolean changed = false;
+ // check and set brokerPort info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
+ changed = true;
+ this.setDataVersionId(dataVerId);
+ }
// check and set consumeEnable info
- if (newConsumeEnable != null
- && this.consumeEnable.isEnable() != newConsumeEnable) {
+ if (consumeEnable != null
+ && this.consumeEnable.isEnable() != consumeEnable) {
changed = true;
- setConsumeEnable(newConsumeEnable);
+ setConsumeEnable(consumeEnable);
}
// check and set disableReason info
- if (newDisableRsn != null
- && !newDisableRsn.equals(disableReason)) {
+ if (disableRsn != null
+ && !disableRsn.equals(disableReason)) {
changed = true;
- disableReason = newDisableRsn;
+ disableReason = disableRsn;
}
// check and set consumeEnable info
- if (newFilterEnable != null
- && this.filterEnable.isEnable() != newFilterEnable) {
+ if (filterEnable != null
+ && this.filterEnable.isEnable() != filterEnable) {
changed = true;
- setFilterEnable(newFilterEnable);
+ setFilterEnable(filterEnable);
}
// check and set filterCondStr info
- if (TStringUtils.isNotBlank(newFilterCondStr)
- && !newFilterCondStr.equals(filterCondStr)) {
+ if (TStringUtils.isNotBlank(filterCondStr)
+ && !filterCondStr.equals(this.filterCondStr)) {
changed = true;
- filterCondStr = newFilterCondStr;
+ this.filterCondStr = filterCondStr;
}
if (changed) {
updSerialId();
@@ -304,13 +313,9 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
@Override
public GroupConsumeCtrlEntity clone() {
- try {
- GroupConsumeCtrlEntity copy = (GroupConsumeCtrlEntity) super.clone();
- copy.setConsumeEnable(getConsumeEnable().isEnable());
- copy.setFilterEnable(getFilterEnable().isEnable());
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
- }
+ GroupConsumeCtrlEntity copy = (GroupConsumeCtrlEntity) super.clone();
+ copy.setConsumeEnable(getConsumeEnable().isEnable());
+ copy.setFilterEnable(getFilterEnable().isEnable());
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index d051184..506100c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -50,10 +50,9 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
super();
}
- public GroupResCtrlEntity(long dataVerId,
- String createUser,
- Date createDate) {
- super(dataVerId, createUser, createDate);
+ public GroupResCtrlEntity(BaseEntity opEntity, String groupName) {
+ super(opEntity);
+ this.groupName = groupName;
}
public GroupResCtrlEntity(String groupName,
@@ -101,7 +100,7 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
//Constructor
int statusId = (this.flowCtrlStatus == EnableStatus.STATUS_ENABLE) ? 1 : 0;
BdbGroupFlowCtrlEntity bdbEntity =
- new BdbGroupFlowCtrlEntity(getDataVersionId(), this.groupName,
+ new BdbGroupFlowCtrlEntity(getDataVerId(), this.groupName,
this.flowCtrlInfo, statusId, this.ruleCnt, this.qryPriorityId,
getAttributes(), getCreateUser(), getCreateDate());
bdbEntity.setConsumeEnable(consumeEnable);
@@ -230,52 +229,58 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(Boolean newEnableConsume, String newDisableRsn,
- Boolean newResChkEnable, int newAllowedB2CRate,
- int newQryPriorityId, Boolean newFlowCtrlEnable,
- int flowRuleCnt, String newFlowCtrlRuleInfo) {
+ public boolean updModifyInfo(long dataVerId, Boolean enableConsume, String disableRsn,
+ Boolean resChkEnable, int allowedB2CRate,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlRuleInfo) {
boolean changed = false;
+ // check and set dataVerId info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
+ changed = true;
+ this.setDataVersionId(dataVerId);
+ }
// check and set resCheckStatus info
- if (newEnableConsume != null
- && this.consumeEnable.isEnable() != newEnableConsume) {
+ if (enableConsume != null
+ && this.consumeEnable.isEnable() != enableConsume) {
changed = true;
- setConsumeEnable(newEnableConsume);
+ setConsumeEnable(enableConsume);
}
// check and set disableReason info
- if (TStringUtils.isNotBlank(newDisableRsn)
- && !newDisableRsn.equals(disableReason)) {
+ if (TStringUtils.isNotBlank(disableRsn)
+ && !disableRsn.equals(disableReason)) {
changed = true;
- this.disableReason = newDisableRsn;
+ this.disableReason = disableRsn;
}
// check and set resCheckStatus info
- if (newResChkEnable != null
- && this.resCheckStatus.isEnable() != newResChkEnable) {
+ if (resChkEnable != null
+ && this.resCheckStatus.isEnable() != resChkEnable) {
changed = true;
- setResCheckStatus(newResChkEnable);
+ setResCheckStatus(resChkEnable);
}
// check and set allowed broker client rate
- if (newAllowedB2CRate != TBaseConstants.META_VALUE_UNDEFINED
- && this.allowedBrokerClientRate != newAllowedB2CRate) {
+ if (allowedB2CRate != TBaseConstants.META_VALUE_UNDEFINED
+ && this.allowedBrokerClientRate != allowedB2CRate) {
changed = true;
- this.allowedBrokerClientRate = newAllowedB2CRate;
+ this.allowedBrokerClientRate = allowedB2CRate;
}
// check and set qry priority id
- if (newQryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
- && this.qryPriorityId != newQryPriorityId) {
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.qryPriorityId != qryPriorityId) {
changed = true;
- this.qryPriorityId = newQryPriorityId;
+ this.qryPriorityId = qryPriorityId;
}
// check and set flowCtrl info
- if (newFlowCtrlEnable != null
- && this.flowCtrlStatus.isEnable() != newFlowCtrlEnable) {
+ if (flowCtrlEnable != null
+ && this.flowCtrlStatus.isEnable() != flowCtrlEnable) {
changed = true;
- setFlowCtrlStatus(newFlowCtrlEnable);
+ setFlowCtrlStatus(flowCtrlEnable);
}
// check and set flowCtrlInfo info
- if (TStringUtils.isNotBlank(newFlowCtrlRuleInfo)
- && !newFlowCtrlRuleInfo.equals(flowCtrlInfo)) {
+ if (TStringUtils.isNotBlank(flowCtrlRuleInfo)
+ && !flowCtrlRuleInfo.equals(flowCtrlInfo)) {
changed = true;
- setFlowCtrlRule(flowRuleCnt, newFlowCtrlRuleInfo);
+ setFlowCtrlRule(flowRuleCnt, flowCtrlRuleInfo);
}
if (changed) {
updSerialId();
@@ -395,15 +400,11 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
@Override
public GroupResCtrlEntity clone() {
- try {
- GroupResCtrlEntity copy = (GroupResCtrlEntity) super.clone();
- copy.setConsumeEnable(getConsumeEnable().isEnable());
- copy.setFlowCtrlStatus(getFlowCtrlStatus());
- copy.setResCheckStatus(getResCheckStatus());
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
- }
+ GroupResCtrlEntity copy = (GroupResCtrlEntity) super.clone();
+ copy.setConsumeEnable(getConsumeEnable().isEnable());
+ copy.setFlowCtrlStatus(getFlowCtrlStatus());
+ copy.setResCheckStatus(getResCheckStatus());
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index c4b80e4..61e148b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -44,6 +44,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
super();
}
+
public TopicCtrlEntity(String topicName, int topicNameId,
int maxMsgSizeInMB, String createUser) {
super(createUser, new Date());
@@ -55,6 +56,11 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
this.maxMsgSizeInMB = maxMsgSizeInMB;
}
+ public TopicCtrlEntity(BaseEntity opEntity, String topicName) {
+ super(opEntity);
+ this.topicName = topicName;
+ }
+
public TopicCtrlEntity(String topicName, int topicNameId,
boolean enableAuth, int maxMsgSizeInB,
long dataVersionId, String createUser,
@@ -89,7 +95,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
new BdbTopicAuthControlEntity(topicName, isAuthCtrlEnable(),
getAttributes(), getCreateUser(), getCreateDate());
bdbEntity.setTopicId(topicNameId);
- bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setMaxMsgSize(maxMsgSizeInB);
return bdbEntity;
}
@@ -143,10 +149,15 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(int topicNameId,
- int newMaxMsgSizeMB,
- EnableStatus authCtrlStatus) {
+ public boolean updModifyInfo(long dataVerId, int topicNameId,
+ int newMaxMsgSizeMB, Boolean enableTopicAuth) {
boolean changed = false;
+ // check and set brokerPort info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
+ changed = true;
+ this.setDataVersionId(dataVerId);
+ }
// check and set topicNameId info
if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
&& this.topicNameId != topicNameId) {
@@ -164,13 +175,11 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
}
}
// check and set authCtrlStatus info
- if (authCtrlStatus != null
- && authCtrlStatus != EnableStatus.STATUS_UNDEFINE
- && this.authCtrlStatus != authCtrlStatus) {
+ if (enableTopicAuth != null
+ && this.authCtrlStatus.isEnable() != enableTopicAuth) {
+ setEnableAuthCtrl(enableTopicAuth);
changed = true;
- this.authCtrlStatus = authCtrlStatus;
}
-
if (changed) {
updSerialId();
}
@@ -274,12 +283,8 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
@Override
public TopicCtrlEntity clone() {
- try {
- TopicCtrlEntity copy = (TopicCtrlEntity) super.clone();
- copy.setAuthCtrlStatus(getAuthCtrlStatus());
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
- }
+ TopicCtrlEntity copy = (TopicCtrlEntity) super.clone();
+ copy.setAuthCtrlStatus(getAuthCtrlStatus());
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index e502854..8b3fd99 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -48,8 +48,15 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
super();
}
- public TopicDeployEntity(long dataVerId, String createUser, Date createDate) {
- super(dataVerId, createUser, createDate);
+ public TopicDeployEntity(BaseEntity opInfoEntity, int brokerId,
+ String brokerIp, int brokerPort, String topicName) {
+ super(opInfoEntity);
+ this.brokerId = brokerId;
+ this.brokerIp = brokerIp;
+ this.brokerPort = brokerPort;
+ this.topicName = topicName;
+ this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, topicName);
+ this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp, brokerPort);
}
public TopicDeployEntity(String topicName, int topicId, int brokerId,
@@ -93,7 +100,7 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
topicProps.isAcceptSubscribe(), topicProps.getNumTopicStores(),
getAttributes(), getCreateUser(), getCreateDate(),
getModifyUser(), getModifyDate());
- bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setTopicId(topicNameId);
bdbEntity.setNumTopicStores(topicProps.getNumTopicStores());
bdbEntity.setMemCacheMsgSizeInMB(topicProps.getMemCacheMsgSizeInMB());
@@ -204,9 +211,16 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
*
* @return if changed
*/
- public boolean updModifyInfo(int topicNameId, int brokerPort, String brokerIp,
- TopicStatus deployStatus, TopicPropGroup topicProps) {
+ public boolean updModifyInfo(long dataVerId, int topicNameId, int brokerPort,
+ String brokerIp, TopicStatus deployStatus,
+ TopicPropGroup topicProps) {
boolean changed = false;
+ // check and set dataVerId info
+ if (dataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.getDataVerId() != dataVerId) {
+ changed = true;
+ this.setDataVersionId(dataVerId);
+ }
// check and set topicNameId info
if (topicNameId != TBaseConstants.META_VALUE_UNDEFINED
&& this.topicNameId != topicNameId) {
@@ -352,8 +366,8 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
public TopicDeployEntity clone() {
try {
TopicDeployEntity copy = (TopicDeployEntity) super.clone();
- if (copy.getTopicProps() != null) {
- copy.setTopicProps(getTopicProps().clone());
+ if (copy.topicProps != null) {
+ copy.topicProps = getTopicProps().clone();
}
copy.setDeployStatus(getDeployStatus());
return copy;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
index f52fe1b..578f22b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
@@ -42,4 +42,6 @@ public interface BrokerConfigMapper extends AbstractMapper {
BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
+
+ Map<Integer, Set<Integer>> getBrokerIdByRegionId(Set<Integer> regionIdSet);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index d269f48..59d1edb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -38,6 +38,8 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
String topicName,
ProcessResult result);
+ boolean isTopicNameInUsed(String topicName);
+
GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
index 51fc665..387e536 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupResCtrlMapper.java
@@ -18,6 +18,8 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.mapper;
import java.util.Map;
+import java.util.Set;
+
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -33,7 +35,6 @@ public interface GroupResCtrlMapper extends AbstractMapper {
GroupResCtrlEntity getGroupResCtrlConf(String groupName);
- Map<String, GroupResCtrlEntity> getGroupResCtrlConf(GroupResCtrlEntity qryEntity);
-
-
+ Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
index 0cb67e0..e8c78cc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
@@ -18,6 +18,9 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.mapper;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
@@ -36,5 +39,8 @@ public interface TopicCtrlMapper extends AbstractMapper {
List<TopicCtrlEntity> getTopicCtrlConf(TopicCtrlEntity qryEntity);
+ Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity);
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index b4a1233..5e28b05 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -39,6 +39,8 @@ public interface TopicDeployMapper extends AbstractMapper {
boolean hasConfiguredTopics(int brokerId);
+ boolean isTopicDeployed(String topicName);
+
List<TopicDeployEntity> getTopicConf(TopicDeployEntity qryEntity);
TopicDeployEntity getTopicConf(int brokerId, String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index d6f51d1..d2e9166 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.server.common.exception.LoadMetaException;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -52,6 +53,8 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
new ConcurrentHashMap<>();
private ConcurrentHashMap<String/* brokerIP */, Integer/* brokerId */> brokerIpIndexCache =
new ConcurrentHashMap<>();
+ private ConcurrentHashMap<Integer/* regionId */, ConcurrentHashSet<Integer>> regionIndexCache =
+ new ConcurrentHashMap<>();
public BdbBrokerConfigMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
brokerConfStore = new EntityStore(repEnv,
@@ -261,6 +264,26 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
return brokerConfCache.get(brokerId);
}
+ @Override
+ public Map<Integer, Set<Integer>> getBrokerIdByRegionId(Set<Integer> regionIdSet) {
+ Set<Integer> qryKey = new HashSet<>();
+ Map<Integer, Set<Integer>> retInfo = new HashMap<>();
+ if (regionIdSet == null || regionIdSet.isEmpty()) {
+ qryKey.addAll(regionIndexCache.keySet());
+ } else {
+ qryKey.addAll(regionIdSet);
+ }
+ for (Integer regionId : qryKey) {
+ ConcurrentHashSet<Integer> brokerIdSet =
+ regionIndexCache.get(regionId);
+ if (brokerIdSet == null || brokerIdSet.isEmpty()) {
+ continue;
+ }
+ retInfo.put(regionId, brokerIdSet);
+ }
+ return retInfo;
+ }
+
/**
* Put cluster setting info into bdb store
*
@@ -304,6 +327,12 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
return;
}
brokerIpIndexCache.remove(curEntity.getBrokerIp());
+ ConcurrentHashSet<Integer> brokerIdSet =
+ regionIndexCache.get(curEntity.getRegionId());
+ if (brokerIdSet == null) {
+ return;
+ }
+ brokerIdSet.remove(brokerId);
}
private void addOrUpdCacheRecord(BrokerConfEntity entity) {
@@ -313,10 +342,20 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
if (brokerId == null || brokerId != entity.getBrokerId()) {
brokerIpIndexCache.put(entity.getBrokerIp(), entity.getBrokerId());
}
+ ConcurrentHashSet<Integer> brokerIdSet = regionIndexCache.get(entity.getRegionId());
+ if (brokerIdSet == null) {
+ ConcurrentHashSet<Integer> tmpBrokerIdSet = new ConcurrentHashSet<>();
+ brokerIdSet = regionIndexCache.putIfAbsent(entity.getRegionId(), tmpBrokerIdSet);
+ if (brokerIdSet == null) {
+ brokerIdSet = tmpBrokerIdSet;
+ }
+ }
+ brokerIdSet.add(entity.getBrokerId());
}
private void clearCacheData() {
brokerIpIndexCache.clear();
+ regionIndexCache.clear();
brokerConfCache.clear();
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 08b665e..7147a1a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -172,6 +172,13 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
@Override
+ public boolean isTopicNameInUsed(String topicName) {
+ ConcurrentHashSet<String> consumeCtrlSet =
+ grpConsumeCtrlTopicCache.get(topicName);
+ return (consumeCtrlSet != null && !consumeCtrlSet.isEmpty());
+ }
+
+ @Override
public boolean delGroupConsumeCtrlConf(String groupName,
String topicName,
ProcessResult result) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
index 3c397d8..327c3c3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
@@ -24,7 +24,9 @@ import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
@@ -163,18 +165,23 @@ public class BdbGroupResCtrlMapperImpl implements GroupResCtrlMapper {
}
@Override
- public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(GroupResCtrlEntity qryEntity) {
+ public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
+ GroupResCtrlEntity qryEntity) {
+ GroupResCtrlEntity entity;
+ Set<String> qryKeySet = new HashSet<>();
Map<String, GroupResCtrlEntity> retMap = new HashMap<>();
- if (qryEntity == null) {
- for (GroupResCtrlEntity entity : groupBaseCtrlCache.values()) {
- retMap.put(entity.getGroupName(), entity);
- }
+ if (groupSet == null || groupSet.isEmpty()) {
+ qryKeySet.addAll(groupBaseCtrlCache.keySet());
} else {
- for (GroupResCtrlEntity entity : groupBaseCtrlCache.values()) {
- if (entity.isMatched(qryEntity)) {
- retMap.put(entity.getGroupName(), entity);
- }
+ qryKeySet.addAll(groupSet);
+ }
+ for (String group : qryKeySet) {
+ entity = groupBaseCtrlCache.get(group);
+ if (entity == null
+ || (qryEntity != null && !qryEntity.isMatched(qryEntity))) {
+ continue;
}
+ retMap.put(entity.getGroupName(), entity);
}
return retMap;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index adc3bb6..ca94af5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -24,7 +24,11 @@ import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
@@ -178,6 +182,28 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
return retEntitys;
}
+ @Override
+ public Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
+ TopicCtrlEntity qryEntity) {
+ Set<String> qryKeySet = new HashSet<>();
+ Map<String, TopicCtrlEntity> retEntityMap = new HashMap<>();
+ if (topicNameSet == null || topicNameSet.isEmpty()) {
+ qryKeySet.addAll(topicCtrlCache.keySet());
+ } else {
+ qryKeySet.addAll(topicNameSet);
+ }
+ for (String topicName : qryKeySet) {
+ TopicCtrlEntity entity = topicCtrlCache.get(topicName);
+ if (entity == null
+ || (qryEntity != null
+ && !entity.isMatched(qryEntity))) {
+ continue;
+ }
+ retEntityMap.put(topicName, entity);
+ }
+ return retEntityMap;
+ }
+
/**
* Put topic control configure info into bdb store
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 5f6c6c8..c38ea16 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -223,6 +223,12 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
}
@Override
+ public boolean isTopicDeployed(String topicName) {
+ ConcurrentHashSet<String> deploySet = topicNameCacheIndex.get(topicName);
+ return (deploySet != null && !deploySet.isEmpty());
+ }
+
+ @Override
public Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> topicNameSet,
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index 4779674..f2714a9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -19,7 +19,6 @@ package org.apache.tubemq.server.master.web.handler;
import static java.lang.Math.abs;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,7 +27,6 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
@@ -37,6 +35,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
@@ -199,8 +198,8 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
Boolean withTopic = (Boolean) result.retData1;
// fill query entity fields
- qryEntity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
- regionId, groupId, null, brokerProps);
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(), brokerPort, brokerTlsPort,
+ brokerWebPort, regionId, groupId, null, brokerProps);
Map<Integer, BrokerConfEntity> qryResult =
metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, qryEntity);
// build query result
@@ -244,18 +243,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// check and get cluster default setting info
ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting();
- if (defClusterSetting == null) {
- if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- defClusterSetting = metaDataManager.getClusterDefSetting();
- }
+ metaDataManager.getClusterDefSetting(false);
// get brokerIp and brokerId field
if (!getBrokerIpAndIdParamValue(req, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
@@ -312,8 +303,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// add record and process result
List<BrokerProcessResult> retInfo = new ArrayList<>();
BrokerProcessResult processResult =
- metaDataManager.addBrokerConfig(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdAndIpTuple.getF0(),
+ metaDataManager.addBrokerConfig(opInfoEntity, brokerIdAndIpTuple.getF0(),
brokerIdAndIpTuple.getF1(), brokerPort, brokerTlsPort, brokerWebPort,
regionId, groupId, manageStatus, brokerProps, sBuilder, result);
retInfo.add(processResult);
@@ -341,12 +331,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity defOpInfoEntity = (BaseEntity) result.getRetData();
// check and get brokerJsonSet info
- if (!getBrokerJsonSetInfo(req, true, true,
- opTupleInfo.getF0(), opTupleInfo.getF1(), opTupleInfo.getF2(),
- null, sBuilder, result)) {
+ if (!getBrokerJsonSetInfo(req, true,
+ defOpInfoEntity, null, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -385,8 +373,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, result)) {
@@ -452,9 +439,8 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
continue;
}
newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opTupleInfo.getF0(), null, null,
- opTupleInfo.getF1(), opTupleInfo.getF2(), null);
- if (!newEntity.updModifyInfo(brokerPort, brokerTlsPort,
+ newEntity.updBaseModifyInfo(opInfoEntity);
+ if (!newEntity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort, brokerTlsPort,
brokerWebPort, regionId, groupId, null, brokerProps)) {
result.setFailResult(DataOpErrCode.DERR_SUCCESS_UNCHANGED.getCode(),
DataOpErrCode.DERR_SUCCESS_UNCHANGED.getDescription());
@@ -487,8 +473,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// get isReservedData info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.ISRESERVEDDATA, false, false, result)) {
@@ -582,12 +567,12 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
Map<String, TopicDeployEntity> brokerTopicConfMap =
metaDataManager.getBrokerTopicConfEntitySet(entry.getBrokerId());
if (brokerTopicConfMap != null) {
- metaDataManager.delBrokerTopicConfig(opTupleInfo.getF1(),
+ metaDataManager.delBrokerTopicConfig(opInfoEntity.getModifyUser(),
entry.getBrokerId(), sBuilder, result);
}
}
- metaDataManager.confDelBrokerConfig(
- opTupleInfo.getF1(), entry.getBrokerId(), sBuilder, result);
+ metaDataManager.confDelBrokerConfig(opInfoEntity.getModifyUser(),
+ entry.getBrokerId(), sBuilder, result);
retInfo.add(new BrokerProcessResult(
entry.getBrokerId(), entry.getBrokerIp(), result));
}
@@ -673,50 +658,29 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return sBuilder;
}
- private boolean getBrokerJsonSetInfo(HttpServletRequest req, boolean required,
- boolean isCreate, long dataVerId,
- String operator, Date operateDate,
+ private boolean getBrokerJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpInfoEntity,
List<Map<String, String>> defValue,
- StringBuilder sBuilder,
- ProcessResult result) {
+ StringBuilder sBuilder, ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.BROKERJSONSET, required, defValue, result)) {
+ WebFieldDef.BROKERJSONSET, true, defValue, result)) {
return result.success;
}
List<Map<String, String>> brokerJsonArray =
(List<Map<String, String>>) result.retData1;
// check and get cluster default setting info
ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting();
- if (defClusterSetting == null) {
- if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
- return result.isSuccess();
- }
- defClusterSetting = metaDataManager.getClusterDefSetting();
- }
+ metaDataManager.getClusterDefSetting(false);
// check and get broker configure
HashMap<Integer, BrokerConfEntity> addedRecordMap = new HashMap<>();
for (int j = 0; j < brokerJsonArray.size(); j++) {
Map<String, String> brokerObject = brokerJsonArray.get(j);
// check and get operation info
- long itemDataVerId = dataVerId;
- String itemCreator = operator;
- Date itemCreateDate = operateDate;
- if (!WebParameterUtils.getAUDBaseInfo(
- brokerObject, true, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(brokerObject,
+ isAddOp, defOpInfoEntity, result)) {
return result.isSuccess();
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
- if (opTupleInfo.getF0() != TBaseConstants.META_VALUE_UNDEFINED) {
- itemDataVerId = opTupleInfo.getF0();
- }
- if (opTupleInfo.getF1() != null) {
- itemCreator = opTupleInfo.getF1();
- }
- if (opTupleInfo.getF2() != null) {
- itemCreateDate = opTupleInfo.getF2();
- }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
// get brokerIp and brokerId field
if (!getBrokerIpAndIdParamValue(brokerObject, sBuilder, result)) {
return result.isSuccess();
@@ -764,22 +728,20 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// manageStatusId
ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
BrokerConfEntity entity =
- new BrokerConfEntity(itemDataVerId, itemCreator, itemCreateDate);
+ new BrokerConfEntity(itemOpEntity);
entity.setBrokerIdAndIp(brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1());
- entity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
- regionId, groupId, manageStatus, brokerProps);
+ entity.updModifyInfo(itemOpEntity.getDataVerId(), brokerPort, brokerTlsPort,
+ brokerWebPort, regionId, groupId, manageStatus, brokerProps);
addedRecordMap.put(entity.getBrokerId(), entity);
}
// check result
if (addedRecordMap.isEmpty()) {
- if (isCreate) {
- result.setFailResult(sBuilder
- .append("Not found record in ")
- .append(WebFieldDef.BROKERJSONSET.name)
- .append(" parameter!").toString());
- sBuilder.delete(0, sBuilder.length());
- return result.isSuccess();
- }
+ result.setFailResult(sBuilder
+ .append("Not found record in ")
+ .append(WebFieldDef.BROKERJSONSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
}
result.setSuccResult(addedRecordMap);
return result.isSuccess();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index dad52df..03cdb62 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -30,13 +30,13 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
@@ -154,7 +154,8 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
Set<String> filterCondSet = (Set<String>) result.retData1;
- qryEntity.updModifyInfo(consumeEnable, null, filterEnable, null);
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+ consumeEnable, null, filterEnable, null);
Map<String, List<GroupConsumeCtrlEntity>> qryResultSet =
metaDataManager.getGroupConsumeCtrlConf(groupSet, topicNameSet);
// build return result
@@ -200,8 +201,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getAndValidTopicNameInfo(req,
metaDataManager, true, null, result)) {
@@ -249,8 +249,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
csmProcessResult =
- metaDataManager.addGroupConsumeCtrlInfo(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), groupName,
+ metaDataManager.addGroupConsumeCtrlInfo(opInfoEntity, groupName,
topicName, consumeEnable, disableRsn,
filterEnable, filterCondStr, sBuilder, result);
retInfo.add(csmProcessResult);
@@ -280,12 +279,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
// check and get groupCsmJsonSet data
- if (!getGroupConsumeJsonSetInfo(req, true, true,
- opTupleInfo.getF0(), opTupleInfo.getF1(), opTupleInfo.getF2(),
- null, sBuilder, result)) {
+ if (!getGroupConsumeJsonSetInfo(req, true,
+ defOpEntity, null, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -328,8 +325,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getAndValidTopicNameInfo(req,
metaDataManager, true, null, result)) {
@@ -377,8 +373,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
csmProcessResult =
- metaDataManager.modGroupConsumeCtrlInfo(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), groupName,
+ metaDataManager.modGroupConsumeCtrlInfo(opEntity, groupName,
topicName, consumeEnable, disableRsn,
filterEnable, filterCondStr, sBuilder, result);
retInfo.add(csmProcessResult);
@@ -408,11 +403,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get groupCsmJsonSet data
- if (!getGroupConsumeJsonSetInfo(req, true, false, opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), null, sBuilder, result)) {
+ if (!getGroupConsumeJsonSetInfo(req, true,
+ opEntity, null, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -455,8 +449,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
@@ -473,7 +466,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
Set<String> groupNameSet = (Set<String>) result.retData1;
// execute delete operation
List<GroupProcessResult> retInfo =
- metaDataManager.delGroupConsumeCtrlConf(opTupleInfo.getF1(),
+ metaDataManager.delGroupConsumeCtrlConf(opEntity.getModifyUser(),
groupNameSet, topicNameSet, sBuilder, result);
buildRetInfo(retInfo, sBuilder);
return sBuilder;
@@ -598,14 +591,13 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
- private boolean getGroupConsumeJsonSetInfo(HttpServletRequest req, boolean required,
- boolean isCreate, long dataVerId,
- String operator, Date operateDate,
+ private boolean getGroupConsumeJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpEntity,
List<Map<String, String>> defValue,
StringBuilder sBuilder,
ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.GROUPCSMJSONSET, required, defValue, result)) {
+ WebFieldDef.GROUPCSMJSONSET, true, defValue, result)) {
return result.success;
}
List<Map<String, String>> filterJsonArray =
@@ -636,7 +628,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
}
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(groupObject,
- WebFieldDef.CONSUMEENABLE, false, (isCreate ? true : null), result)) {
+ WebFieldDef.CONSUMEENABLE, false, (isAddOp ? true : null), result)) {
return result.isSuccess();
}
Boolean consumeEnable = (Boolean) result.retData1;
@@ -648,23 +640,23 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
String disableRsn = (String) result.retData1;
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(groupObject,
- WebFieldDef.FILTERENABLE, false, (isCreate ? false : null), result)) {
+ WebFieldDef.FILTERENABLE, false, (isAddOp ? false : null), result)) {
return result.isSuccess();
}
Boolean filterEnable = (Boolean) result.retData1;
// get filterConds info
if (!WebParameterUtils.getFilterCondString(groupObject,
- false, isCreate, result)) {
+ false, isAddOp, result)) {
return result.isSuccess();
}
String filterCondStr = (String) result.retData1;
// record object
- if (isCreate) {
+ if (isAddOp) {
// add new record
GroupConsumeCtrlEntity entity =
- new GroupConsumeCtrlEntity(dataVerId, operator, operateDate);
- entity.setGroupAndTopic(groupName, topicName);
- entity.updModifyInfo(consumeEnable, disableRsn, filterEnable, filterCondStr);
+ new GroupConsumeCtrlEntity(defOpEntity, groupName, topicName);
+ entity.updModifyInfo(defOpEntity.getDataVerId(),
+ consumeEnable, disableRsn, filterEnable, filterCondStr);
result.setSuccResult(entity);
csmProcessMap.put(entity.getRecordKey(),
new GroupProcessResult(groupName, topicName, result));
@@ -680,10 +672,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
continue;
}
GroupConsumeCtrlEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(dataVerId, null,
- null, operator, operateDate, null);
- if (newEntity.updModifyInfo(consumeEnable,
- disableRsn, filterEnable, filterCondStr)) {
+ newEntity.updBaseModifyInfo(defOpEntity);
+ if (newEntity.updModifyInfo(defOpEntity.getDataVerId(),
+ consumeEnable, disableRsn, filterEnable, filterCondStr)) {
result.setSuccResult(newEntity);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
@@ -695,14 +686,12 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
}
// check result
if (csmProcessMap.isEmpty()) {
- if (isCreate) {
- result.setFailResult(sBuilder
- .append("Not found record in ")
- .append(WebFieldDef.GROUPCSMJSONSET.name)
- .append(" parameter!").toString());
- sBuilder.delete(0, sBuilder.length());
- return result.isSuccess();
- }
+ result.setFailResult(sBuilder
+ .append("Not found record in ")
+ .append(WebFieldDef.GROUPCSMJSONSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
}
result.setSuccResult(csmProcessMap);
return result.isSuccess();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index f29ff0e..e8d5265 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -18,18 +18,17 @@
package org.apache.tubemq.server.master.web.handler;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -125,21 +124,16 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
Boolean flowCtrlEnable = (Boolean) result.retData1;
- entity.updModifyInfo(consumeEnable, null, resCheckEnable,
- TBaseConstants.META_VALUE_UNDEFINED, inQryPriorityId, flowCtrlEnable,
- TBaseConstants.META_VALUE_UNDEFINED, null);
+ entity.updModifyInfo(entity.getDataVerId(), consumeEnable, null,
+ resCheckEnable, TBaseConstants.META_VALUE_UNDEFINED, inQryPriorityId,
+ flowCtrlEnable, TBaseConstants.META_VALUE_UNDEFINED, null);
Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
- metaDataManager.confGetGroupResCtrlConf(entity);
- // Fill in the key to be queried
- if (inGroupSet.isEmpty()) {
- inGroupSet.addAll(groupResCtrlEntityMap.keySet());
- }
+ metaDataManager.confGetGroupResCtrlConf(inGroupSet, entity);
// build return result
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
- for (String groupName : inGroupSet) {
- entity = groupResCtrlEntityMap.get(groupName);
- if (entity == null) {
+ for (GroupResCtrlEntity resCtrlEntity : groupResCtrlEntityMap.values()) {
+ if (resCtrlEntity == null) {
continue;
}
if (totalCnt++ > 0) {
@@ -171,8 +165,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// get group list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, true, null, result)) {
@@ -236,8 +229,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
GroupProcessResult retItem;
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : batchGroupNames) {
- retItem = metaDataManager.addGroupResCtrlConf(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), groupName, consumeEnable,
+ retItem = metaDataManager.addGroupResCtrlConf(opEntity, groupName, consumeEnable,
disableRsn, resCheckEnable, allowedBClientRate, qryPriorityId,
flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuilder, result);
retInfo.add(retItem);
@@ -265,8 +257,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// get group list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, true, null, result)) {
@@ -329,10 +320,10 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
GroupProcessResult retItem;
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : batchGroupNames) {
- retItem = metaDataManager.updGroupResCtrlConf(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), groupName, consumeEnable,
- disableRsn, resCheckEnable, allowedBClientRate, qryPriorityId,
- flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuilder, result);
+ retItem = metaDataManager.updGroupResCtrlConf(opEntity, groupName,
+ consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
+ sBuilder, result);
retInfo.add(retItem);
}
return buildRetInfo(retInfo, sBuilder);
@@ -358,8 +349,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// get group list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, true, null, result)) {
@@ -369,7 +359,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
Set<String> batchGroupNames = (Set<String>) result.retData1;
// delete group resource record
List<GroupProcessResult> retInfo =
- metaDataManager.delGroupResCtrlConf(opTupleInfo.getF1(),
+ metaDataManager.delGroupResCtrlConf(opEntity.getModifyUser(),
batchGroupNames, sBuilder, result);
return buildRetInfo(retInfo, sBuilder);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 532a05f..769ad2f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -17,7 +17,6 @@
package org.apache.tubemq.server.master.web.handler;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,13 +26,13 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
@@ -150,7 +149,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
public StringBuilder adminQueryClusterDefSetting(HttpServletRequest req) {
StringBuilder sBuilder = new StringBuilder(512);
ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting();
+ metaDataManager.getClusterDefSetting(true);
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
if (defClusterSetting != null) {
defClusterSetting.toWebJsonStr(sBuilder, true, true);
@@ -179,8 +178,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check max message size
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.MAXMSGSIZEINMB, false,
@@ -245,10 +243,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
String flowCtrlInfo = (String) result.retData1;
// add or modify record
ClusterSettingEntity newConf = null;
- ClusterSettingEntity curConf = metaDataManager.getClusterDefSetting();
+ ClusterSettingEntity curConf = metaDataManager.getClusterDefSetting(true);
if (curConf == null) {
- if (!metaDataManager.addClusterDefSetting(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), inBrokerPort,
+ if (!metaDataManager.addClusterDefSetting(opEntity, inBrokerPort,
inBrokerTlsPort, inBrokerWebPort, inMaxMsgSizeMB,
inQryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
defTopicProps, sBuilder, result)) {
@@ -256,8 +253,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
return sBuilder;
}
} else {
- if (!metaDataManager.modClusterDefSetting(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), inBrokerPort,
+ if (!metaDataManager.modClusterDefSetting(opEntity, inBrokerPort,
inBrokerTlsPort, inBrokerWebPort, inMaxMsgSizeMB,
inQryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
defTopicProps, sBuilder, result)) {
@@ -265,7 +261,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
return sBuilder;
}
}
- curConf = metaDataManager.getClusterDefSetting();
+ curConf = metaDataManager.getClusterDefSetting(true);
if (curConf == null) {
WebParameterUtils.buildFailResultWithBlankData(
DataOpErrCode.DERR_UPD_NOT_EXIST.getCode(),
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
new file mode 100644
index 0000000..2daa6ee
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+
+
+
+
+public class WebTopicCtrlHandler extends AbstractWebHandler {
+
+ /**
+ * Constructor
+ *
+ * @param master tube master
+ */
+ public WebTopicCtrlHandler(TMaster master) {
+ super(master);
+ }
+
+
+
+ @Override
+ public void registerWebApiMethod() {
+ // register query method
+ registerQueryWebMethod("admin_query_topic_control_info",
+ "adminQueryTopicCtrlInfo");
+
+ // register modify method
+ registerModifyWebMethod("admin_add_topic_control_info",
+ "adminAddTopicCtrlInfo");
+ registerModifyWebMethod("admin_batch_add_topic_control_info",
+ "adminBatchAddTopicCtrlInfo");
+ registerModifyWebMethod("admin_modify_topic_control_info",
+ "adminModTopicCtrlInfo");
+ registerModifyWebMethod("admin_batch_modify_topic_control_info",
+ "adminBatchModTopicCtrlInfo");
+ registerModifyWebMethod("admin_delete_topic_control_info",
+ "adminDeleteTopicCtrlInfo");
+ }
+
+ /**
+ * Query topic control info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ TopicCtrlEntity qryEntity = new TopicCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // query matched records
+ Map<String, TopicCtrlEntity> topicCtrlMap =
+ metaDataManager.getTopicCtrlConf(topicNameSet, qryEntity);
+ // build query result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (TopicCtrlEntity entity : topicCtrlMap.values()) {
+ if (entity == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ entity.toWebJsonStr(sBuffer, true, true);
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Add new topic control record
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // get topicNameId info
+ int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
+ if (topicNameSet.size() == 1) {
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICNAMEID,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ topicNameId = (int) result.getRetData();
+ }
+ // get authCtrlStatus info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean enableTopicAuth = (Boolean) result.retData1;
+ // check and get max message size
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting(false);
+ int maxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
+ // check max message size
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MAXMSGSIZEINMB, false, maxMsgSizeMB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ maxMsgSizeMB = (int) result.getRetData();
+ // add records
+ List<TopicProcessResult> retInfo =
+ metaDataManager.addOrUpdTopicCtrlConf(opEntity, topicNameSet,
+ topicNameId, enableTopicAuth, maxMsgSizeMB, sBuilder, result);
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Add new topic control record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchAddTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
+ // check and get add record map
+ if (!getTopicCtrlJsonSetInfo(req, true,
+ defOpEntity, null, sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Map<String, TopicCtrlEntity> addRecordMap =
+ (Map<String, TopicCtrlEntity>) result.getRetData();
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(topicCtrlInfo, sBuilder, result));
+ }
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Modify topic control info
+ *
+ * @param req
+ * @return
+ */
+ // #lizard forgives
+ public StringBuilder adminModTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // get topicNameId info
+ int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
+ if (topicNameSet.size() == 1) {
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICNAMEID,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ topicNameId = (int) result.getRetData();
+ }
+ // get authCtrlStatus info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean enableTopicAuth = (Boolean) result.retData1;
+ // check and get max message size
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting(false);
+ int maxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
+ // check max message size
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MAXMSGSIZEINMB, false, maxMsgSizeMB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ maxMsgSizeMB = (int) result.getRetData();
+ // modify records
+ List<TopicProcessResult> retInfo =
+ metaDataManager.addOrUpdTopicCtrlConf(opInfoEntity, topicNameSet,
+ topicNameId, enableTopicAuth, maxMsgSizeMB, sBuilder, result);
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Modify new topic control record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchModTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
+ // check and get modify record map
+ if (!getTopicCtrlJsonSetInfo(req, false,
+ defOpEntity, null, sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Map<String, TopicCtrlEntity> modRecordMap =
+ (Map<String, TopicCtrlEntity>) result.getRetData();
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (TopicCtrlEntity topicCtrlInfo : modRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(topicCtrlInfo, sBuilder, result));
+ }
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Delete topic control info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDeleteTopicCtrlInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // delete records
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ metaDataManager.delTopicCtrlConf(opEntity.getModifyUser(), topicName, sBuffer, result);
+ retInfo.add(new TopicProcessResult(
+ TBaseConstants.META_VALUE_UNDEFINED, topicName, result));
+ }
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ private boolean getTopicCtrlJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpEntity,
+ List<Map<String, String>> defValue,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.TOPICCTRLSET, true, defValue, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> ctrlJsonArray =
+ (List<Map<String, String>>) result.retData1;
+ // get default max message size
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting(false);
+ int defMaxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
+ // check and get topic control configure
+ TopicCtrlEntity itemConf;
+ HashMap<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
+ for (int j = 0; j < ctrlJsonArray.size(); j++) {
+ Map<String, String> confMap = ctrlJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(confMap, isAddOp, defOpEntity, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get topicName configure info
+ if (!WebParameterUtils.getStringParamValue(confMap,
+ WebFieldDef.TOPICNAME, true, "", result)) {
+ return result.success;
+ }
+ String topicName = (String) result.retData1;
+ // check max message size
+ if (!WebParameterUtils.getIntParamValue(confMap,
+ WebFieldDef.MAXMSGSIZEINMB, false, defMaxMsgSizeMB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
+ return result.isSuccess();
+ }
+ int itemMaxMsgSizeMB = (int) result.getRetData();
+ // get topicNameId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICNAMEID,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
+ return result.isSuccess();
+ }
+ int itemTopicNameId = (int) result.getRetData();
+ // get authCtrlStatus info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
+ return result.isSuccess();
+ }
+ Boolean enableTopicAuth = (Boolean) result.retData1;
+ itemConf = new TopicCtrlEntity(itemOpEntity, topicName);
+ itemConf.updModifyInfo(itemOpEntity.getDataVerId(),
+ itemTopicNameId, itemMaxMsgSizeMB, enableTopicAuth);
+ addRecordMap.put(itemConf.getTopicName(), itemConf);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuilder
+ .append("Not found record info in ")
+ .append(WebFieldDef.TOPICCTRLSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
+ private StringBuilder buildRetInfo(List<TopicProcessResult> retInfo,
+ StringBuilder sBuilder) {
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (TopicProcessResult entry : retInfo) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"topicName\":\"").append(entry.getTopicName()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
similarity index 88%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index a71fb12..ce2cfa6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -18,7 +18,6 @@
package org.apache.tubemq.server.master.web.handler;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +28,6 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
@@ -37,6 +35,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
@@ -50,17 +49,17 @@ import org.slf4j.LoggerFactory;
-public class WebTopicConfHandler extends AbstractWebHandler {
+public class WebTopicDeployHandler extends AbstractWebHandler {
private static final Logger logger =
- LoggerFactory.getLogger(WebTopicConfHandler.class);
+ LoggerFactory.getLogger(WebTopicDeployHandler.class);
/**
* Constructor
*
* @param master tube master
*/
- public WebTopicConfHandler(TMaster master) {
+ public WebTopicDeployHandler(TMaster master) {
super(master);
}
@@ -142,8 +141,9 @@ public class WebTopicConfHandler extends AbstractWebHandler {
return sBuffer;
}
TopicStatus topicStatus = (TopicStatus) result.getRetData();
- qryEntity.updModifyInfo(TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_VALUE_UNDEFINED, null, topicStatus, topicProps);
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ brokerPort, null, topicStatus, topicProps);
Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
metaDataManager.getTopicConfEntityMap(topicNameSet, brokerIdSet, qryEntity);
// build query result
@@ -457,8 +457,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
@@ -473,19 +472,9 @@ public class WebTopicConfHandler extends AbstractWebHandler {
return sBuilder;
}
Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // check and get cluster default setting info
- ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting();
- if (defClusterSetting == null) {
- if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- defClusterSetting = metaDataManager.getClusterDefSetting();
- }
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(req,
- defClusterSetting.getClsDefTopicProps(), result)) {
+ null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -503,8 +492,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
int inMaxMsgSizeMB = (int) result.getRetData();
*/
List<TopicProcessResult> retInfo =
- metaDataManager.addTopicDeployInfo(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ metaDataManager.addTopicDeployInfo(opEntity, brokerIdSet,
topicNameSet, topicPropInfo, sBuilder, result);
return buildRetInfo(retInfo, sBuilder);
}
@@ -529,11 +517,10 @@ public class WebTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
// check and get add record map
- if (!getTopicDeployJsonSetInfo(req, true, true, opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), null, sBuilder, result)) {
+ if (!getTopicDeployJsonSetInfo(req, true,
+ defOpEntity, null, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -568,8 +555,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
@@ -592,8 +578,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
// modify records
List<TopicProcessResult> retInfo =
- metaDataManager.modTopicConfig(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ metaDataManager.modTopicConfig(opEntity, brokerIdSet,
topicNameSet, topicProps, sBuilder, result);
return buildRetInfo(retInfo, sBuilder);
}
@@ -642,8 +627,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
@@ -660,59 +644,37 @@ public class WebTopicConfHandler extends AbstractWebHandler {
Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
// modify records
List<TopicProcessResult> retInfo =
- metaDataManager.modRedoDelTopicConf(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ metaDataManager.modRedoDelTopicConf(opEntity, brokerIdSet,
topicNameSet, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}
- private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean required,
- boolean isCreate, long dataVerId,
- String operator, Date operateDate,
+ private boolean getTopicDeployJsonSetInfo(HttpServletRequest req, boolean isAdd,
+ BaseEntity defOpEntity,
List<Map<String, String>> defValue,
StringBuilder sBuilder,
ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.TOPICJSONSET, required, defValue, result)) {
+ WebFieldDef.TOPICJSONSET, true, defValue, result)) {
return result.success;
}
List<Map<String, String>> deployJsonArray =
(List<Map<String, String>>) result.retData1;
// check and get cluster default setting info
ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting();
- if (defClusterSetting == null) {
- if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
- return result.isSuccess();
- }
- defClusterSetting = metaDataManager.getClusterDefSetting();
- }
+ metaDataManager.getClusterDefSetting(false);
TopicDeployEntity itemConf;
Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
// check and get topic deployment configure
- HashMap<String, TopicDeployEntity> addedRecordMap = new HashMap<>();
for (int j = 0; j < deployJsonArray.size(); j++) {
Map<String, String> confMap = deployJsonArray.get(j);
// check and get operation info
- long itemDataVerId = dataVerId;
- String itemCreator = operator;
- Date itemCreateDate = operateDate;
- if (!WebParameterUtils.getAUDBaseInfo(confMap, true, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(confMap, isAdd, defOpEntity, result)) {
return result.isSuccess();
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
- if (opTupleInfo.getF0() != TBaseConstants.META_VALUE_UNDEFINED) {
- itemDataVerId = opTupleInfo.getF0();
- }
- if (opTupleInfo.getF1() != null) {
- itemCreator = opTupleInfo.getF1();
- }
- if (opTupleInfo.getF2() != null) {
- itemCreateDate = opTupleInfo.getF2();
- }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
// get topicName configure info
if (!WebParameterUtils.getStringParamValue(confMap,
WebFieldDef.TOPICNAME, true, "", result)) {
@@ -749,26 +711,23 @@ public class WebTopicConfHandler extends AbstractWebHandler {
if (topicCtrlEntity != null) {
topicNameId = topicCtrlEntity.getTopicId();
}
- itemConf = new TopicDeployEntity(itemDataVerId, itemCreator, itemCreateDate);
- itemConf.setTopicDeployInfo(brokerConf.getBrokerId(),
+ itemConf = new TopicDeployEntity(itemOpEntity, brokerConf.getBrokerId(),
brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName);
- itemConf.updModifyInfo(topicNameId,
+ itemConf.updModifyInfo(itemOpEntity.getDataVerId(), topicNameId,
TBaseConstants.META_VALUE_UNDEFINED, null,
TopicStatus.STATUS_TOPIC_OK, topicPropInfo);
addRecordMap.put(itemConf.getRecordKey(), itemConf);
}
// check result
- if (addedRecordMap.isEmpty()) {
- if (isCreate) {
- result.setFailResult(sBuilder
- .append("Not found record in ")
- .append(WebFieldDef.TOPICJSONSET.name)
- .append(" parameter!").toString());
- sBuilder.delete(0, sBuilder.length());
- return result.isSuccess();
- }
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuilder
+ .append("Not found record in ")
+ .append(WebFieldDef.TOPICJSONSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
}
- result.setSuccResult(addedRecordMap);
+ result.setSuccResult(addRecordMap);
return result.isSuccess();
}
@@ -802,7 +761,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
sBuilder.append(",");
}
sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
- .append("{\"topicName\":\"").append(entry.getTopicName()).append("\"")
+ .append(",\"topicName\":\"").append(entry.getTopicName()).append("\"")
.append(",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
@@ -835,8 +794,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Tuple3<Long, String, Date> opTupleInfo =
- (Tuple3<Long, String, Date>) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
@@ -853,8 +811,7 @@ public class WebTopicConfHandler extends AbstractWebHandler {
Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
// modify records
List<TopicProcessResult> retInfo =
- metaDataManager.modDelOrRmvTopicConf(opTupleInfo.getF0(),
- opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdSet,
+ metaDataManager.modDelOrRmvTopicConf(opEntity, brokerIdSet,
topicNameSet, topicStatus, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}