You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/04/15 03:26:21 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-595]Add
WebBrokerConfHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 87fbc9e [INLONG-595]Add WebBrokerConfHandler class implementation
87fbc9e is described below
commit 87fbc9e70df2f64a9441cc40a9acc0c5e5c12d4f
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sun Apr 11 20:55:29 2021 +0800
[INLONG-595]Add WebBrokerConfHandler class implementation
---
.../tubemq/server/common/TServerConstants.java | 5 +-
.../tubemq/server/common/fielddef/WebFieldDef.java | 15 +-
.../server/common/utils/WebParameterUtils.java | 299 ++++++-
.../server/master/metamanage/MetaDataManager.java | 154 ++--
.../metastore/BdbMetaStoreServiceImpl.java | 115 ++-
.../metamanage/metastore/MetaStoreService.java | 59 +-
.../metastore/dao/entity/BaseEntity.java | 6 +-
.../metastore/dao/entity/BrokerConfEntity.java | 77 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 10 +-
.../dao/entity/GroupConsumeCtrlEntity.java | 10 +-
.../metastore/dao/entity/GroupResCtrlEntity.java | 11 +-
.../metastore/dao/entity/TopicCtrlEntity.java | 10 +-
.../dao/entity/TopicDeployConfEntity.java | 10 +-
.../metastore/dao/mapper/BrokerConfigMapper.java | 6 +
.../dao/mapper/TopicDeployConfigMapper.java | 3 +
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 45 +
.../bdbimpl/BdbTopicDeployConfigMapperImpl.java | 16 +
.../master/web/handler/BrokerProcessResult.java | 55 ++
.../master/web/handler/WebBrokerConfHandler.java | 904 +++++++++++++++++++++
.../web/handler/WebGroupConsumeCtrlHandler.java | 2 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 2 +-
.../master/web/handler/WebMasterInfoHandler.java | 4 +-
22 files changed, 1695 insertions(+), 123 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index e06a988..21c6a25 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -64,8 +64,11 @@ public final class TServerConstants {
public static final int GROUP_BROKER_CLIENT_RATE_MIN = 0;
+ public static final int BROKER_REGION_ID_MIN = 0;
+ public static final int BROKER_REGION_ID_DEF = 0;
-
+ public static final int BROKER_GROUP_ID_MIN = 0;
+ public static final int BROKER_GROUP_ID_DEF = 0;
public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128;
public static final int CFG_ROWLOCK_DEFAULT_DURATION = 30000;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 9d44a14..1ecca31 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -207,7 +207,20 @@ public enum WebFieldDef {
CONSUMEENABLE(72, "consumeEnable", "csmEn",
WebFieldType.BOOLEAN, "Consume enable status"),
GROUPCSMJSONSET(73, "groupCsmJsonSet", "csmJsonSet",
- WebFieldType.JSONSET, "The batch group consume configure json array");
+ WebFieldType.JSONSET, "The batch group consume configure json array"),
+ WITHTOPIC(74, "withTopic", "wTopic",
+ WebFieldType.BOOLEAN, "With topic info."),
+
+ ISINCLUDE(75, "isInclude", "isInclude",
+ WebFieldType.BOOLEAN, "If include or un-include topic required"),
+ COMPBROKERIP(76, "brokerIp", "bIp", WebFieldType.COMPSTRING,
+ "Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
+ RegexDef.TMP_IPV4ADDRESS),
+ ISRESERVEDDATA(77, "isReservedData", "isRsvDt",
+ WebFieldType.BOOLEAN, "Whether to keep topic data in the broker");
+
+
+
public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index a875a35..7f20a58 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -43,6 +43,7 @@ import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.statusdef.TopicStatus;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
@@ -174,6 +175,51 @@ public class WebParameterUtils {
return result.isSuccess();
}
+ public static boolean getAUDBaseInfo(Map<String, String> keyValueMap,
+ boolean isAdd,
+ ProcessResult result) {
+ // check and get data version id
+ if (!WebParameterUtils.getLongParamValue(keyValueMap, WebFieldDef.DATAVERSIONID,
+ false, TBaseConstants.META_VALUE_UNDEFINED, result)) {
+ return result.isSuccess();
+ }
+ long dataVerId = (long) result.retData1;
+ // check and get createUser or modifyUser
+ String operator = null;
+ Date opDate = null;
+ if (isAdd) {
+ // check create user field
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.CREATEUSER, false, null, result)) {
+ return result.isSuccess();
+ }
+ operator = (String) result.retData1;
+ // check and get create date
+ if (!WebParameterUtils.getDateParameter(keyValueMap,
+ WebFieldDef.CREATEDATE, false, null, result)) {
+ return result.isSuccess();
+ }
+ opDate = (Date) result.retData1;
+
+ } else {
+ // check modify user field
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.MODIFYUSER, false, null, result)) {
+ return result.isSuccess();
+ }
+ operator = (String) result.retData1;
+ // check and get modify date
+ if (!WebParameterUtils.getDateParameter(keyValueMap,
+ WebFieldDef.MODIFYDATE, false, null, result)) {
+ return result.isSuccess();
+ }
+ opDate = (Date) result.retData1;
+ }
+ result.setSuccResult(new Tuple3<Long, String, Date>(
+ dataVerId, operator, opDate));
+ return result.isSuccess();
+ }
+
/**
* Parse the parameter value required for query
*
@@ -247,12 +293,26 @@ public class WebParameterUtils {
* Decode the deletePolicy parameter value from an object value
* the value must like {method},{digital}[s|m|h]
*
- * @param req Http Servlet Request
+ * @param keyValueMap key-value map
* @param required a boolean value represent whether the parameter is must required
* @param defValue a default value returned if failed to parse value from the given object
* @param result process result of parameter value
* @return the process result
*/
+ public static boolean getDeletePolicyParameter(Map<String, String> keyValueMap,
+ boolean required, String defValue,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.DELETEPOLICY, required, defValue, result)) {
+ return result.isSuccess();
+ }
+ String delPolicy = (String) result.retData1;
+ if (TStringUtils.isBlank(delPolicy)) {
+ return result.isSuccess();
+ }
+ return validDeletePolicyValue(delPolicy, result);
+ }
+
public static boolean getDeletePolicyParameter(HttpServletRequest req,
boolean required, String defValue,
ProcessResult result) {
@@ -264,6 +324,10 @@ public class WebParameterUtils {
if (TStringUtils.isBlank(delPolicy)) {
return result.isSuccess();
}
+ return validDeletePolicyValue(delPolicy, result);
+ }
+
+ private static boolean validDeletePolicyValue(String delPolicy, ProcessResult result) {
// check value format
String[] tmpStrs = delPolicy.split(",");
if (tmpStrs.length != 2) {
@@ -339,6 +403,27 @@ public class WebParameterUtils {
}
+ public static boolean getTopicStatusParamValue(HttpServletRequest req,
+ boolean isRequired,
+ TopicStatus defVal,
+ ProcessResult result) {
+ // get topicStatusId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICSTATUSID,
+ isRequired, defVal.getCode(), TopicStatus.STATUS_TOPIC_OK.getCode(), result)) {
+ return result.isSuccess();
+ }
+ int paramValue = (int) result.getRetData();
+ try {
+ TopicStatus topicStatus = TopicStatus.valueOf(paramValue);
+ result.setSuccResult(topicStatus);
+ } catch (Throwable e) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512).append("The value of field ")
+ .append(WebFieldDef.TOPICSTATUSID.name())
+ .append(" invalid:").append(e.getMessage()).toString());
+ }
+ return result.isSuccess();
+ }
/**
* Parse the parameter value for TopicPropGroup class
@@ -476,6 +561,141 @@ public class WebParameterUtils {
}
/**
+ * Parse the parameter value for TopicPropGroup class
+ *
+ * @param keyValueMap parameter key-value map
+ * @param defVal default value
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getTopicPropInfo(Map<String, String> keyValueMap,
+ TopicPropGroup defVal,
+ ProcessResult result) {
+ TopicPropGroup newConf = new TopicPropGroup();
+ // get numTopicStores parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.NUMTOPICSTORES,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, result)) {
+ return result.isSuccess();
+ }
+ int numTopicStores = (int) result.retData1;
+ if (numTopicStores == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ numTopicStores = defVal.getNumTopicStores();
+ }
+ newConf.setNumTopicStores(numTopicStores);
+ // get numPartitions parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.NUMPARTITIONS,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_PARTITION_NUM_MIN, result)) {
+ return result.isSuccess();
+ }
+ int numPartitions = (int) result.retData1;
+ if (numPartitions == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ numPartitions = defVal.getNumPartitions();
+ }
+ newConf.setNumPartitions(numPartitions);
+ // get unflushThreshold parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.UNFLUSHTHRESHOLD,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHTHRESHOLD_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushThreshold = (int) result.retData1;
+ if (unflushThreshold == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ unflushThreshold = defVal.getUnflushThreshold();
+ }
+ newConf.setUnflushThreshold(unflushThreshold);
+ // get unflushInterval parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.UNFLUSHINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHINTERVAL_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushInterval = (int) result.retData1;
+ if (unflushInterval == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ unflushInterval = defVal.getUnflushInterval();
+ }
+ newConf.setUnflushInterval(unflushInterval);
+ // get unflushDataHold parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.UNFLUSHINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHDATAHOLD_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushDataHold = (int) result.retData1;
+ if (unflushDataHold == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ unflushDataHold = defVal.getUnflushDataHold();
+ }
+ newConf.setUnflushDataHold(unflushDataHold);
+ // get memCacheMsgSizeInMB parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.MCACHESIZEINMB,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHESIZE_MB_MIN,
+ TServerConstants.TOPIC_CACHESIZE_MB_MAX, result)) {
+ return result.isSuccess();
+ }
+ int cacheMsgSizeInMB = (int) result.retData1;
+ if (cacheMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ cacheMsgSizeInMB = defVal.getMemCacheMsgSizeInMB();
+ }
+ newConf.setMemCacheMsgSizeInMB(cacheMsgSizeInMB);
+ // get memCacheFlushIntvl parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.UNFMCACHEINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHEINTVL_MIN, result)) {
+ return result.isSuccess();
+ }
+ int cacheFlushIntvl = (int) result.retData1;
+ if (cacheFlushIntvl == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ cacheFlushIntvl = defVal.getMemCacheFlushIntvl();
+ }
+ newConf.setMemCacheFlushIntvl(cacheFlushIntvl);
+ // get memCacheMsgCntInK parameter value
+ if (!WebParameterUtils.getIntParamValue(keyValueMap, WebFieldDef.UNFMCACHECNTINK,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHECNT_INK_MIN, result)) {
+ return result.isSuccess();
+ }
+ int cacheMsgCntInK = (int) result.retData1;
+ if (cacheMsgCntInK == TBaseConstants.META_VALUE_UNDEFINED && defVal != null) {
+ cacheMsgCntInK = defVal.getMemCacheMsgCntInK();
+ }
+ newConf.setMemCacheMsgCntInK(cacheMsgCntInK);
+ // get deletePolicy parameter value
+ if (!WebParameterUtils.getDeletePolicyParameter(keyValueMap,
+ false, null, result)) {
+ return result.isSuccess();
+ }
+ String deletePolicy = (String) result.retData1;
+ if (deletePolicy == null && defVal != null) {
+ deletePolicy = defVal.getDeletePolicy();
+ }
+ newConf.setDeletePolicy(deletePolicy);
+ // get acceptPublish parameter value
+ if (!WebParameterUtils.getBooleanParamValue(keyValueMap,
+ WebFieldDef.ACCEPTPUBLISH, false, null, result)) {
+ return result.isSuccess();
+ }
+ Boolean acceptPublish = (Boolean) result.retData1;
+ if (acceptPublish == null && defVal != null) {
+ acceptPublish = defVal.getAcceptPublish();
+ }
+ newConf.setAcceptPublish(acceptPublish);
+ // get acceptSubscribe parameter value
+ if (!WebParameterUtils.getBooleanParamValue(keyValueMap,
+ WebFieldDef.ACCEPTSUBSCRIBE, false, null, result)) {
+ return result.isSuccess();
+ }
+ Boolean acceptSubscribe = (Boolean) result.retData1;
+ if (acceptSubscribe == null && defVal != null) {
+ acceptSubscribe = defVal.getAcceptSubscribe();
+ }
+ newConf.setAcceptSubscribe(acceptSubscribe);
+ result.setSuccResult(newConf);
+ return result.isSuccess();
+ }
+
+ /**
* Parse the parameter value from an object value to a long value
*
* @param paramName the parameter name
@@ -697,6 +917,31 @@ public class WebParameterUtils {
return result.success;
}
+ public static boolean getLongParamValue(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ long defValue,
+ ProcessResult result) {
+ if (!getStringParamValue(keyValueMap,
+ fieldDef, required, null, result)) {
+ return result.success;
+ }
+ String paramValue = (String) result.retData1;
+ if (paramValue == null) {
+ result.setSuccResult(defValue);
+ return result.success;
+ }
+ try {
+ long paramIntVal = Long.parseLong(paramValue);
+ result.setSuccResult(paramIntVal);
+ } catch (Throwable e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" parse error: ").append(e.getMessage()).toString());
+ }
+ return result.success;
+ }
+
/**
* Parse the parameter value from an object value to a integer value
*
@@ -782,6 +1027,17 @@ public class WebParameterUtils {
true, minValue, true, maxValue, result);
}
+ public static boolean getIntParamValue(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ int defValue,
+ int minValue,
+ int maxValue,
+ ProcessResult result) {
+ return getIntParamValue(keyValueMap, fieldDef, required, true, defValue,
+ true, minValue, true, maxValue, result);
+ }
+
private static boolean getIntParamValue(HttpServletRequest req,
WebFieldDef fieldDef,
boolean required,
@@ -1415,6 +1671,32 @@ public class WebParameterUtils {
return result.success;
}
+ public static boolean getDateParameter(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ Date defValue,
+ ProcessResult result) {
+ if (!getStringParamValue(keyValueMap,
+ fieldDef, required, null, result)) {
+ return result.success;
+ }
+ String paramValue = (String) result.retData1;
+ if (paramValue == null) {
+ result.setSuccResult(defValue);
+ return result.success;
+ }
+ try {
+ DateFormat sdf = new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
+ Date date = sdf.parse(paramValue);
+ result.setSuccResult(date);
+ } catch (Throwable e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" parse error: ").append(e.getMessage()).toString());
+ }
+ return result.success;
+ }
+
/**
* Valid execution authorization info
* @param req Http Servlet Request
@@ -1548,6 +1830,21 @@ public class WebParameterUtils {
return true;
}
+ public static boolean checkBrokerInOfflining(int brokerId,
+ int manageStatus,
+ MetaDataManager metaManager) {
+ BrokerSyncStatusInfo brokerSyncStatusInfo =
+ metaManager.getBrokerRunSyncStatusInfo(brokerId);
+ if ((brokerSyncStatusInfo != null)
+ && (brokerSyncStatusInfo.isBrokerRegister())) {
+ if ((manageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)
+ && (brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Parse the parameter value from an object value to ip address of string value
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index 370c01e..d3e79dc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -59,6 +59,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDepl
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TargetValidResult;
+import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.slf4j.Logger;
@@ -414,29 +415,34 @@ public class MetaDataManager implements Server {
/**
* Add broker configure information
*
- * @param entity the broker configure entity will be add
- * @param strBuffer the print information string buffer
+ * @param sBuilder the print information string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean confAddBrokerConfig(BrokerConfEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.addBrokerConf(entity, result)) {
- updateBrokerMaps(entity);
- strBuffer.append("[confAddBrokerConfig], ")
- .append(entity.getCreateUser())
- .append(" added broker configure record :")
- .append(entity.toString());
- logger.info(strBuffer.toString());
+ public BrokerProcessResult addBrokerConfig(long dataVerId, String createUser, Date createDate,
+ int brokerId, String brokerIp, int brokerPort,
+ int brokerTlsPort, int brokerWebPort, int regionId,
+ int groupId, ManageStatus manageStatus,
+ TopicPropGroup topicProps, StringBuilder sBuilder,
+ ProcessResult result) {
+ BrokerConfEntity entity = new BrokerConfEntity(dataVerId, createUser, createDate);
+ entity.setBrokerIdAndIp(brokerId, brokerIp);
+ entity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
+ regionId, groupId, manageStatus, topicProps);
+ return addBrokerConfig(entity, sBuilder, result);
+ }
+
+ public BrokerProcessResult addBrokerConfig(BrokerConfEntity entity,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) != null
+ || metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
} else {
- strBuffer.append("[confAddBrokerConfig], ")
- .append("failure to add broker configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
+ metaStoreService.addBrokerConf(entity, sBuilder, result);
}
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
+ return new BrokerProcessResult(entity.getBrokerId(), entity.getBrokerIp(), result);
}
/**
@@ -447,27 +453,27 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean confModBrokerConfig(BrokerConfEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.updBrokerConf(entity, result)) {
- BrokerConfEntity oldEntity =
- (BrokerConfEntity) result.getRetData();
- BrokerConfEntity curEntity =
- metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
- strBuffer.append("[confModBrokerConfig], ")
- .append(entity.getModifyUser())
- .append(" updated record from :")
- .append(oldEntity.toString())
- .append(" to ").append(curEntity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confModBrokerConfig], ")
- .append("failure to update broker configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
+ public boolean modBrokerConfig(BrokerConfEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ metaStoreService.updBrokerConf(entity, strBuffer, result);
+ return result.isSuccess();
+ }
+
+ /**
+ * Delete broker's topic configure information
+ *
+ * @param operator operator
+ * @param brokerId need deleted broker id
+ * @param strBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public boolean delBrokerTopicConfig(String operator,
+ int brokerId,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ metaStoreService.delTopicConfByBrokerId(operator, brokerId, strBuffer, result);
return result.isSuccess();
}
@@ -516,22 +522,10 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
}
- if (metaStoreService.delBrokerConf(brokerId, result)) {
+ if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
this.brokerRunSyncManageMap.remove(brokerId);
delBrokerRunData(brokerId);
- BrokerConfEntity entity = (BrokerConfEntity) result.getRetData();
- if (entity != null) {
- strBuffer.append("[confDelBrokerConfig], ").append(operator)
- .append(" deleted broker configure record :").append(entity.toString());
- logger.info(strBuffer.toString());
- }
- } else {
- strBuffer.append("[confDelBrokerConfig], ")
- .append("failure to delete broker configure record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
}
- strBuffer.delete(0, strBuffer.length());
return result.isSuccess();
}
@@ -547,6 +541,18 @@ public class MetaDataManager implements Server {
}
/**
+ * Get broker configure information
+ *
+ * @param qryEntity
+ * @return broker configure information
+ */
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity) {
+ return metaStoreService.getBrokerConfInfo(brokerIdSet, brokerIpSet, qryEntity);
+ }
+
+ /**
* Manual reload broker config info
*
* @param entity
@@ -784,7 +790,7 @@ public class MetaDataManager implements Server {
if (isChanged) {
if (!curEntity.isConfDataUpdated()) {
curEntity.setConfDataUpdated();
- confModBrokerConfig(curEntity, strBuffer, result);
+ modBrokerConfig(curEntity, strBuffer, result);
}
if (curEntity.getManageStatus().isApplied()) {
BrokerSyncStatusInfo brokerSyncStatusInfo =
@@ -810,7 +816,7 @@ public class MetaDataManager implements Server {
} else {
if (curEntity.isConfDataUpdated()) {
curEntity.setBrokerLoaded();
- confModBrokerConfig(curEntity, strBuffer, result);
+ modBrokerConfig(curEntity, strBuffer, result);
}
if (curEntity.getManageStatus().isApplied()) {
BrokerSyncStatusInfo brokerSyncStatusInfo =
@@ -1238,6 +1244,18 @@ public class MetaDataManager implements Server {
// //////////////////////////////////////////////////////////////////////////////
+ public boolean addClusterDefSetting(StringBuilder strBuffer, ProcessResult result) {
+ if (metaStoreService.getClusterConfig() == null) {
+ ClusterSettingEntity defConf =
+ new ClusterSettingEntity(TServerConstants.DEFAULT_DATA_VERSION,
+ "Stystem", new Date());
+ defConf.fillDefaultValue();
+ return metaStoreService.addClusterConfig(defConf, strBuffer, result);
+ }
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
public boolean addClusterDefSetting(long dataVerId, String createUsr, Date createDate,
int brokerPort, int brokerTlsPort, int brokerWebPort,
int maxMsgSizeMB, int qryPriorityId,
@@ -1284,34 +1302,6 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
- /**
- * Delete cluster default setting
- *
- * @param operator operator
- * @param strBuffer the print info string buffer
- * @param result the process result return
- * @return true if success
- */
- public boolean confDelClusterDefSetting(String operator,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.delClusterConfig(result)) {
- ClusterSettingEntity entity =
- (ClusterSettingEntity) result.getRetData();
- if (entity != null) {
- strBuffer.append("[confDelClusterDefSetting], ").append(operator)
- .append(" deleted cluster setting record :").append(entity.toString());
- logger.info(strBuffer.toString());
- }
- } else {
- strBuffer.append("[confDelClusterDefSetting], ")
- .append("failure to delete cluster setting record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
- }
public ClusterSettingEntity getClusterDefSetting() {
return metaStoreService.getClusterConfig();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 0f8f902..0878e30 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -287,39 +287,106 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
- public boolean delClusterConfig(ProcessResult result) {
+ public boolean delClusterConfig(String operator,
+ StringBuilder strBuffer,
+ ProcessResult result) {
if (!checkStoreStatus(true, result)) {
return false;
}
- return clusterConfigMapper.delClusterConfig(result);
+ if (clusterConfigMapper.delClusterConfig(result)) {
+ ClusterSettingEntity entity =
+ (ClusterSettingEntity) result.getRetData();
+ if (entity != null) {
+ strBuffer.append("[delClusterConfig], ").append(operator)
+ .append(" deleted cluster setting record :").append(entity.toString());
+ logger.info(strBuffer.toString());
+ }
+ } else {
+ strBuffer.append("[delClusterConfig], ")
+ .append("failure to delete cluster setting record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
// broker configure api
@Override
- public boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+ public boolean addBrokerConf(BrokerConfEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return brokerConfigMapper.addBrokerConf(memEntity, result);
+ if (brokerConfigMapper.addBrokerConf(entity, result)) {
+ strBuffer.append("[addBrokerConf], ")
+ .append(entity.getCreateUser())
+ .append(" added broker configure record :")
+ .append(entity.toString());
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[addBrokerConf], ")
+ .append("failure to add broker configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
- public boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result) {
+ public boolean updBrokerConf(BrokerConfEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return brokerConfigMapper.updBrokerConf(memEntity, result);
+ if (brokerConfigMapper.updBrokerConf(entity, result)) {
+ BrokerConfEntity oldEntity =
+ (BrokerConfEntity) result.getRetData();
+ BrokerConfEntity curEntity =
+ brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
+ strBuffer.append("[updBrokerConf], ")
+ .append(entity.getModifyUser())
+ .append(" updated broker configure record from :")
+ .append(oldEntity.toString())
+ .append(" to ").append(curEntity.toString());
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[updBrokerConf], ")
+ .append("failure to update broker configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
- public boolean delBrokerConf(int brokerId, ProcessResult result) {
+ public boolean delBrokerConf(String operator, int brokerId,
+ StringBuilder strBuffer, ProcessResult result) {
// check current status
if (!checkStoreStatus(true, result)) {
return result.isSuccess();
}
- return brokerConfigMapper.delBrokerConf(brokerId, result);
+ if (brokerConfigMapper.delBrokerConf(brokerId, result)) {
+ BrokerConfEntity entity = (BrokerConfEntity) result.getRetData();
+ if (entity != null) {
+ strBuffer.append("[delBrokerConf], ").append(operator)
+ .append(" deleted broker configure record :").append(entity.toString());
+ logger.info(strBuffer.toString());
+ }
+ } else {
+ strBuffer.append("[delBrokerConf], ")
+ .append("failure to delete broker configure record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
}
@Override
@@ -328,6 +395,13 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity) {
+ return brokerConfigMapper.getBrokerConfInfo(brokerIdSet, brokerIpSet, qryEntity);
+ }
+
+ @Override
public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
return brokerConfigMapper.getBrokerConfByBrokerId(brokerId);
}
@@ -366,6 +440,31 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public boolean delTopicConfByBrokerId(String operator,
+ int brokerId,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ // check current status
+ if (!checkStoreStatus(true, result)) {
+ return result.isSuccess();
+ }
+ if (topicDeployConfigMapper.delTopicConfByBrokerId(brokerId, result)) {
+ strBuffer.append("[delTopicConfByBrokerId], ")
+ .append(operator)
+ .append(" deleted topic deploy record :")
+ .append(brokerId);
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[delTopicConfByBrokerId], ")
+ .append("failure to delete topic deploy record : ")
+ .append(brokerId).append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
+ }
+
+ @Override
public boolean hasConfiguredTopics(int brokerId) {
return topicDeployConfigMapper.hasConfiguredTopics(brokerId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 0fcb666..b765543 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -67,17 +67,63 @@ public interface MetaStoreService extends KeepAlive, Server {
ClusterSettingEntity getClusterConfig();
- boolean delClusterConfig(ProcessResult result);
+ /**
+ * Delete cluster default setting
+ *
+ * @param operator operator
+ * @param strBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success
+ */
+ boolean delClusterConfig(String operator,
+ StringBuilder strBuffer,
+ ProcessResult result);
// broker configure api
- boolean addBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+ /**
+ * Add broker configure information
+ *
+ * @param entity the broker configure entity will be add
+ * @param strBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean addBrokerConf(BrokerConfEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result);
- boolean updBrokerConf(BrokerConfEntity memEntity, ProcessResult result);
+ /**
+ * Modify broker configure information
+ *
+ * @param entity the broker configure entity will be update
+ * @param strBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean updBrokerConf(BrokerConfEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result);
- boolean delBrokerConf(int brokerId, ProcessResult result);
+ /**
+ * Delete broker configure information
+ *
+ * @param operator operator
+ * @param brokerId need deleted broker id
+ * @param strBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ boolean delBrokerConf(String operator,
+ int brokerId,
+ StringBuilder strBuffer,
+ ProcessResult result);
Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity);
+ Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity);
+
BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
@@ -89,6 +135,11 @@ public interface MetaStoreService extends KeepAlive, Server {
boolean delTopicConf(String recordKey, ProcessResult result);
+ boolean delTopicConfByBrokerId(String operator,
+ int brokerId,
+ StringBuilder strBuffer,
+ ProcessResult result);
+
boolean hasConfiguredTopics(int brokerId);
TopicDeployConfEntity getTopicConfByeRecKey(String recordKey);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index f24e16c..a7bbee4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -18,11 +18,10 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.io.Serializable;
import java.util.Date;
import java.util.Objects;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
@@ -178,7 +177,8 @@ public class BaseEntity implements Serializable, Cloneable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this);
+ Gson gson = new GsonBuilder().create();
+ return toJsonString(gson);
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index ed08c58..979333c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -60,6 +60,9 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
super();
}
+ public BrokerConfEntity(long dataVerId, String createUser, Date createDate) {
+ super(dataVerId, createUser, createDate);
+ }
public BrokerConfEntity(int brokerId, String brokerIp, int brokerPort,
int brokerTLSPort, int brokerWebPort, ManageStatus manageStatus,
@@ -132,6 +135,11 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
this.brokerId = brokerId;
}
+ public void setBrokerIdAndIp(int brokerId, String brokerIp) {
+ this.brokerId = brokerId;
+ this.brokerIp = brokerIp;
+ }
+
public ManageStatus getManageStatus() {
return manageStatus;
}
@@ -271,6 +279,65 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
}
/**
+ * update subclass field values
+ *
+ * @return if changed
+ */
+ public boolean updModifyInfo(int brokerPort, int brokerTlsPort, int brokerWebPort,
+ int regionId, int groupId, ManageStatus manageStatus,
+ TopicPropGroup topicProps) {
+ boolean changed = false;
+ // check and set brokerPort info
+ if (brokerPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerPort != brokerPort) {
+ changed = true;
+ this.brokerPort = brokerPort;
+ }
+ // check and set brokerTLSPort info
+ if (brokerTlsPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerTLSPort != brokerTlsPort) {
+ changed = true;
+ this.brokerTLSPort = brokerTlsPort;
+ }
+ // check and set brokerWebPort info
+ if (brokerWebPort != TBaseConstants.META_VALUE_UNDEFINED
+ && this.brokerWebPort != brokerWebPort) {
+ changed = true;
+ this.brokerWebPort = brokerWebPort;
+ }
+ // check and set regionId info
+ if (regionId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.regionId != regionId) {
+ changed = true;
+ this.regionId = regionId;
+ }
+ // check and set regionId info
+ if (groupId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.groupId != groupId) {
+ changed = true;
+ this.groupId = groupId;
+ }
+ // check and set resCheckStatus info
+ if (manageStatus != null
+ && manageStatus != ManageStatus.STATUS_MANAGE_UNDEFINED
+ && this.manageStatus != manageStatus) {
+ changed = true;
+ this.manageStatus = manageStatus;
+ }
+ // check and set topicProps info
+ if (topicProps != null
+ && !topicProps.isDataEquals(this.topicProps)) {
+ changed = true;
+ this.topicProps = topicProps;
+ }
+ if (changed) {
+ updSerialId();
+ buildStrInfo();
+ }
+ return changed;
+ }
+
+ /**
* Check whether the specified query item value matches
* Allowed query items:
* brokerId, brokerIp, brokerPort, brokerTLSPort, regionId, groupId
@@ -310,10 +377,12 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
String manageSts =
WebParameterUtils.getBrokerManageStatusStr(getManageStatus().getCode());
if (isLongName) {
@@ -341,7 +410,9 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
}
topicProps.toWebJsonStr(sBuilder, isLongName);
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 9f9c275..d861511 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -254,10 +254,12 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
int tmpMsgSizeInMB = maxMsgSizeInB;
if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
@@ -283,7 +285,9 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
}
clsDefTopicProps.toWebJsonStr(sBuilder, isLongName);
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index a81bd3e..6ad0eb8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -233,10 +233,12 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
String tmpFilterConds = filterCondStr;
if (tmpFilterConds.length() <= 2) {
tmpFilterConds = "";
@@ -257,7 +259,9 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
.append(",\"fltRls\":\"").append(tmpFilterConds).append("\"");
}
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 31cfbb5..d051184 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -283,7 +283,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
return changed;
}
-
/**
* Check whether the specified query item value matches
* Allowed query items:
@@ -320,10 +319,12 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
if (isLongName) {
sBuilder.append("{\"groupName\":\"").append(groupName).append("\"")
.append(",\"consumeEnable\":").append(consumeEnable.isEnable())
@@ -346,7 +347,9 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
.append(",\"fCtrlInfo\":").append(flowCtrlInfo);
}
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 96ec5d1..0bd8f9e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -163,10 +163,12 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
int tmpMsgSizeInMB = maxMsgSizeInB;
if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
@@ -183,7 +185,9 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
.append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB);
}
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
index 179a427..2802d35 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployConfEntity.java
@@ -211,10 +211,12 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
*
* @param sBuilder build container
* @param isLongName if return field key is long name
+ * @param fullFormat if return full format json
* @return
*/
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName,
+ boolean fullFormat) {
if (isLongName) {
sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
.append(",\"brokerId\":").append(brokerId)
@@ -232,7 +234,9 @@ public class TopicDeployConfEntity extends BaseEntity implements Cloneable {
}
topicProps.toWebJsonStr(sBuilder, isLongName);
super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
+ if (fullFormat) {
+ sBuilder.append("}");
+ }
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
index 38173cb..f52fe1b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/BrokerConfigMapper.java
@@ -18,6 +18,8 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.mapper;
import java.util.Map;
+import java.util.Set;
+
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
@@ -33,6 +35,10 @@ public interface BrokerConfigMapper extends AbstractMapper {
Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity qryEntity);
+ Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity);
+
BrokerConfEntity getBrokerConfByBrokerId(int brokerId);
BrokerConfEntity getBrokerConfByBrokerIp(String brokerIp);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
index 1d33dc4..832ff2f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployConfigMapper.java
@@ -34,6 +34,9 @@ public interface TopicDeployConfigMapper extends AbstractMapper {
boolean delTopicConf(String recordKey, ProcessResult result);
+ boolean delTopicConfByBrokerId(Integer brokerId, ProcessResult result);
+
+
boolean hasConfiguredTopics(int brokerId);
List<TopicDeployConfEntity> getTopicConf(TopicDeployConfEntity qryEntity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 1cd519f..d6f51d1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -24,7 +24,9 @@ import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.exception.LoadMetaException;
@@ -199,6 +201,49 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
* @return result, only read
*/
@Override
+ public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
+ Set<String> brokerIpSet,
+ BrokerConfEntity qryEntity) {
+ Set<Integer> qryBrokerKey = null;
+ Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
+ if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+ qryBrokerKey = new HashSet<>(brokerIdSet);
+ }
+ if (brokerIpSet != null && !brokerIpSet.isEmpty()) {
+ if (qryBrokerKey == null) {
+ qryBrokerKey = new HashSet<>();
+ }
+ for (String brokerIp : brokerIpSet) {
+ Integer brokerId = brokerIpIndexCache.get(brokerIp);
+ if (brokerId != null) {
+ qryBrokerKey.add(brokerId);
+ }
+ }
+ }
+ if (qryBrokerKey == null) {
+ qryBrokerKey = new HashSet<>(brokerConfCache.keySet());
+ }
+ if (qryBrokerKey.isEmpty()) {
+ return retMap;
+ }
+ for (Integer brokerId : qryBrokerKey) {
+ BrokerConfEntity entity = brokerConfCache.get(brokerId);
+ if (entity == null
+ || (qryEntity != null && !qryEntity.isMatched(entity))) {
+ continue;
+ }
+ retMap.put(entity.getBrokerId(), entity);
+ }
+ return retMap;
+ }
+
+
+
+ /**
+ * get broker configure info from bdb store
+ * @return result, only read
+ */
+ @Override
public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
return brokerConfCache.get(brokerId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
index 46965ed..5b9dcae 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployConfigMapperImpl.java
@@ -172,6 +172,22 @@ public class BdbTopicDeployConfigMapperImpl implements TopicDeployConfigMapper {
}
@Override
+ public boolean delTopicConfByBrokerId(Integer brokerId, ProcessResult result) {
+ ConcurrentHashSet<String> recordKeySet =
+ brokerIdCacheIndex.get(brokerId);
+ if (recordKeySet == null) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ for (String recordKey : recordKeySet) {
+ delTopicConfigFromBdb(recordKey);
+ delCacheRecord(recordKey);
+ }
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
+ @Override
public boolean hasConfiguredTopics(int brokerId) {
ConcurrentHashSet<String> keySet =
brokerIdCacheIndex.get(brokerId);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/BrokerProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/BrokerProcessResult.java
new file mode 100644
index 0000000..6eb14c5
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/BrokerProcessResult.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+
+
+public class BrokerProcessResult extends ProcessResult {
+ private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
+ private String brokerIp = "";
+
+ public BrokerProcessResult() {
+
+ }
+
+ public BrokerProcessResult(int brokerId,
+ String brokerIp,
+ ProcessResult result) {
+ super(result);
+ this.brokerId = brokerId;
+ this.brokerIp = brokerIp;
+ }
+
+ public int getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(int brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ public String getBrokerIp() {
+ return brokerIp;
+ }
+
+ public void setBrokerIp(String brokerIp) {
+ this.brokerIp = brokerIp;
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
new file mode 100644
index 0000000..f11e3b9
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -0,0 +1,904 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.web.handler;
+
+import static java.lang.Math.abs;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.AddressUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployConfEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * <p>
+ * The class to handle the default config of broker, including:
+ * - Add config
+ * - Update config
+ * - Delete config
+ * And manage the broker status.
+ * <p>
+ * Please note that one IP could only host one broker, and brokerId must be unique
+ */
+public class WebBrokerConfHandler extends AbstractWebHandler {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(WebBrokerConfHandler.class);
+
+ /**
+ * Constructor
+ *
+ * @param master tube master
+ */
+ public WebBrokerConfHandler(TMaster master) {
+ super(master);
+ }
+
+ @Override
+ public void registerWebApiMethod() {
+ // register query method
+ registerQueryWebMethod("admin_query_broker_run_status",
+ "adminQueryBrokerRunStatusInfo");
+ registerQueryWebMethod("admin_query_broker_configure",
+ "adminQueryBrokerDefConfEntityInfo");
+ // register modify method
+ registerModifyWebMethod("admin_add_broker_configure",
+ "adminAddBrokerDefConfEntityInfo");
+ registerModifyWebMethod("admin_bath_add_broker_configure",
+ "adminBatchAddBrokerDefConfEntityInfo");
+ registerModifyWebMethod("admin_online_broker_configure",
+ "adminOnlineBrokerConf");
+ registerModifyWebMethod("admin_update_broker_configure",
+ "adminUpdateBrokerConf");
+ registerModifyWebMethod("admin_reload_broker_configure",
+ "adminReloadBrokerConf");
+ registerModifyWebMethod("admin_set_broker_read_or_write",
+ "adminSetReadOrWriteBrokerConf");
+ registerModifyWebMethod("admin_release_broker_autoforbidden_status",
+ "adminRelBrokerAutoForbiddenStatus");
+ registerModifyWebMethod("admin_offline_broker_configure",
+ "adminOfflineBrokerConf");
+ registerModifyWebMethod("admin_delete_broker_configure",
+ "adminDeleteBrokerConfEntityInfo");
+ }
+
+ /**
+ * Query broker config
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryBrokerDefConfEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ BrokerConfEntity qryEntity = new BrokerConfEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ // get brokerIp info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPBROKERIP, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> brokerIpSet = (Set<String>) result.retData1;
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerPort = (int) result.getRetData();
+ // get brokerTlsPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerTlsPort = (int) result.getRetData();
+ // get brokerWebPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerWebPort = (int) result.getRetData();
+ // get regionId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.BROKER_REGION_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int regionId = (int) result.getRetData();
+ // get groupId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.BROKER_GROUP_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int groupId = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req,
+ null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
+ // get and valid TopicStatusId info
+ if (!WebParameterUtils.getTopicStatusParamValue(req,
+ false, TopicStatus.STATUS_TOPIC_UNDEFINED, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicStatus topicStatus = (TopicStatus) result.getRetData();
+ // get topic info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // get isInclude info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ISINCLUDE, false, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean isInclude = (Boolean) result.retData1;
+ // get withTopic info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHTOPIC, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean withTopic = (Boolean) result.retData1;
+ // fill query entity fields
+ qryEntity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
+ regionId, groupId, null, brokerProps);
+ Map<Integer, BrokerConfEntity> qryResult =
+ metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, qryEntity);
+ // build query result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (BrokerConfEntity entity : qryResult.values()) {
+ Map<String, TopicDeployConfEntity> topicConfEntityMap =
+ metaDataManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
+ if (!isValidRecord(topicNameSet, isInclude, topicStatus, topicConfEntityMap)) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ entity.toWebJsonStr(sBuilder, true, false);
+ sBuilder = addTopicInfo(withTopic, sBuilder, topicConfEntityMap);
+ sBuilder.append("}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
+ }
+
+
+ /**
+ * Add default config to a broker
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddBrokerDefConfEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get cluster default setting info
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting();
+ if (defClusterSetting == null) {
+ if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ defClusterSetting = metaDataManager.getClusterDefSetting();
+ }
+ // get brokerIp and brokerId field
+ if (!getBrokerIpAndIdParamValue(req, sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple2<Integer, String> brokerIdAndIpTuple =
+ (Tuple2<Integer, String>) result.getRetData();
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, defClusterSetting.getBrokerPort(), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerPort = (int) result.getRetData();
+ // get brokerTlsPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
+ false, defClusterSetting.getBrokerTLSPort(), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerTlsPort = (int) result.getRetData();
+ // get brokerWebPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
+ false, defClusterSetting.getBrokerWebPort(), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerWebPort = (int) result.getRetData();
+ // get regionId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
+ false, TServerConstants.BROKER_REGION_ID_DEF,
+ TServerConstants.BROKER_REGION_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int regionId = (int) result.getRetData();
+ // get groupId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
+ false, TServerConstants.BROKER_GROUP_ID_DEF,
+ TServerConstants.BROKER_GROUP_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int groupId = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req,
+ defClusterSetting.getClsDefTopicProps(), result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
+ // manageStatusId
+ ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
+ // add record and process result
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ BrokerProcessResult processResult =
+ metaDataManager.addBrokerConfig(opTupleInfo.getF0(),
+ opTupleInfo.getF1(), opTupleInfo.getF2(), brokerIdAndIpTuple.getF0(),
+ brokerIdAndIpTuple.getF1(), brokerPort, brokerTlsPort, brokerWebPort,
+ regionId, groupId, manageStatus, brokerProps, sBuilder, result);
+ retInfo.add(processResult);
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Add default config to brokers in batch
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ public StringBuilder adminBatchAddBrokerDefConfEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get brokerJsonSet info
+ if (!getBrokerJsonSetInfo(req, true, true,
+ opTupleInfo.getF0(), opTupleInfo.getF1(), opTupleInfo.getF2(),
+ null, sBuilder, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Map<Integer, BrokerConfEntity> addedRecordMap =
+ (HashMap<Integer, BrokerConfEntity>) result.getRetData();
+ // add record and process result
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ for (BrokerConfEntity brokerEntity : addedRecordMap.values()) {
+ BrokerProcessResult processResult =
+ metaDataManager.addBrokerConfig(brokerEntity, sBuilder, result);
+ retInfo.add(processResult);
+ }
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Update broker default config.
+ * The current record will be checked firstly.
+ * The update will be performed only when there are changes.
+ *
+ * @param req
+ * @return
+ * @throws Throwable
+ */
+ public StringBuilder adminUpdateBrokerConf(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerPort = (int) result.getRetData();
+ // get brokerTlsPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerTlsPort = (int) result.getRetData();
+ // get brokerWebPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int brokerWebPort = (int) result.getRetData();
+ // get regionId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.BROKER_REGION_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int regionId = (int) result.getRetData();
+ // get groupId field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.BROKER_GROUP_ID_MIN, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ int groupId = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
+ // manageStatusId
+ ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
+ // add record and process result
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ BrokerConfEntity newEntity;
+ for (Integer brokerId : brokerIdSet) {
+ BrokerConfEntity curEntity =
+ metaDataManager.getBrokerConfByBrokerId(brokerId);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(opTupleInfo.getF0(), null, null,
+ opTupleInfo.getF1(), opTupleInfo.getF2(), null);
+ if (!newEntity.updModifyInfo(brokerPort, brokerTlsPort,
+ brokerWebPort, regionId, groupId, null, brokerProps)) {
+ result.setFailResult(DataOpErrCode.DERR_SUCCESS_UNCHANGED.getCode(),
+ DataOpErrCode.DERR_SUCCESS_UNCHANGED.getDescription());
+ retInfo.add(new BrokerProcessResult(brokerId, curEntity.getBrokerIp(), result));
+ continue;
+ }
+ metaDataManager.modBrokerConfig(newEntity, sBuilder, result);
+ retInfo.add(new BrokerProcessResult(brokerId, curEntity.getBrokerIp(), result));
+ }
+ return buildRetInfo(retInfo, sBuilder);
+ }
+
+ /**
+ * Delete broker config
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDeleteBrokerConfEntityInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ // get isReservedData info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ISRESERVEDDATA, false, false, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean isReservedData = (Boolean) result.retData1;
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ if (brokerIds.isEmpty()) {
+ WebParameterUtils.buildFailResult(sBuilder,
+ "Illegal value: Null value of brokerId parameter");
+ return sBuilder;
+ }
+ Map<Integer, BrokerConfEntity> qryResult =
+ metaDataManager.getBrokerConfInfo(brokerIds, null, null);
+ if (qryResult.isEmpty()) {
+ WebParameterUtils.buildFailResult(sBuilder,
+ "Illegal value: not found broker configure by brokerId value");
+ return sBuilder;
+ }
+ // check broker configure status
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ Map<Integer, BrokerConfEntity> needDelMap = new HashMap<>();
+ Map<String, TopicDeployConfEntity> topicConfigMap;
+ for (BrokerConfEntity entity : qryResult.values()) {
+ if (entity == null) {
+ continue;
+ }
+ topicConfigMap = metaDataManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
+ if (topicConfigMap == null || topicConfigMap.isEmpty()) {
+ needDelMap.put(entity.getBrokerId(), entity);
+ continue;
+ }
+ if (WebParameterUtils.checkBrokerInOfflining(entity.getBrokerId(),
+ entity.getManageStatus().getCode(), metaDataManager)) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("Illegal value: the broker is processing offline event by brokerId=")
+ .append(entity.getBrokerId()).append(", please wait and try later!").toString());
+ retInfo.add(new BrokerProcessResult(
+ entity.getBrokerId(), entity.getBrokerIp(), result));
+ continue;
+ }
+ boolean isMatched = true;
+ if (isReservedData) {
+ for (Map.Entry<String, TopicDeployConfEntity> entry : topicConfigMap.entrySet()) {
+ if (entry.getValue() == null) {
+ continue;
+ }
+ if (entry.getValue().getTopicProps().isAcceptPublish()
+ || entry.getValue().getTopicProps().isAcceptSubscribe()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("The topic ").append(entry.getKey())
+ .append("'s acceptPublish and acceptSubscribe parameters")
+ .append(" must be false in broker=")
+ .append(entity.getBrokerId())
+ .append(" before broker delete by reserve data method!").toString());
+ retInfo.add(new BrokerProcessResult(
+ entity.getBrokerId(), entity.getBrokerIp(), result));
+ isMatched = false;
+ break;
+ }
+ }
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuilder.append("Topic configure of broker by brokerId=")
+ .append(entity.getBrokerId())
+ .append(" not deleted, please delete broker's topic configure first!").toString());
+ retInfo.add(new BrokerProcessResult(
+ entity.getBrokerId(), entity.getBrokerIp(), result));
+ isMatched = false;
+ }
+ if (isMatched) {
+ needDelMap.put(entity.getBrokerId(), entity);
+ }
+ }
+ if (needDelMap.isEmpty()) {
+ return buildRetInfo(retInfo, sBuilder);
+ }
+ // do delete operation
+ for (BrokerConfEntity entry : needDelMap.values()) {
+ if (entry == null) {
+ continue;
+ }
+ if (isReservedData) {
+ Map<String, TopicDeployConfEntity> brokerTopicConfMap =
+ metaDataManager.getBrokerTopicConfEntitySet(entry.getBrokerId());
+ if (brokerTopicConfMap != null) {
+ metaDataManager.delBrokerTopicConfig(opTupleInfo.getF1(),
+ entry.getBrokerId(), sBuilder, result);
+ }
+ }
+ metaDataManager.confDelBrokerConfig(
+ opTupleInfo.getF1(), entry.getBrokerId(), sBuilder, result);
+ retInfo.add(new BrokerProcessResult(
+ entry.getBrokerId(), entry.getBrokerIp(), result));
+ }
+ return buildRetInfo(retInfo, sBuilder);
+
+ }
+
+ /**
+ * Check if the record is valid
+ *
+ * @param qryTopicSet
+ * @param topicStatus
+ * @param isInclude
+ * @param topicConfEntityMap
+ * @return
+ */
+ private boolean isValidRecord(Set<String> qryTopicSet, Boolean isInclude,
+ TopicStatus topicStatus,
+ Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+ if ((topicConfEntityMap == null) || (topicConfEntityMap.isEmpty())) {
+ if ((qryTopicSet.isEmpty() || !isInclude)
+ && topicStatus == TopicStatus.STATUS_TOPIC_UNDEFINED) {
+ return true;
+ }
+ return false;
+ }
+ // first search topic if match require
+ if (!qryTopicSet.isEmpty()) {
+ boolean matched = false;
+ Set<String> curTopics = topicConfEntityMap.keySet();
+ if (isInclude) {
+ for (String topic : qryTopicSet) {
+ if (curTopics.contains(topic)) {
+ matched = true;
+ break;
+ }
+ }
+ } else {
+ matched = true;
+ for (String topic : qryTopicSet) {
+ if (curTopics.contains(topic)) {
+ matched = false;
+ break;
+ }
+ }
+ }
+ if (!matched) {
+ return false;
+ }
+ }
+ // second check topic status if match
+ for (TopicDeployConfEntity topicConfEntity : topicConfEntityMap.values()) {
+ if (topicConfEntity.getDeployStatus() == topicStatus) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Private method to add topic info
+ *
+ * @param withTopic
+ * @param sBuilder
+ * @param topicConfEntityMap
+ * @return
+ */
+ private StringBuilder addTopicInfo(Boolean withTopic, StringBuilder sBuilder,
+ Map<String, TopicDeployConfEntity> topicConfEntityMap) {
+ if (withTopic) {
+ sBuilder.append(",\"topicSet\":[");
+ int topicCount = 0;
+ if (topicConfEntityMap != null) {
+ for (TopicDeployConfEntity topicEntity : topicConfEntityMap.values()) {
+ if (topicCount++ > 0) {
+ sBuilder.append(",");
+ }
+ topicEntity.toWebJsonStr(sBuilder, true, true);
+ }
+ }
+ sBuilder.append("]");
+ }
+ return sBuilder;
+ }
+
+ private boolean getBrokerJsonSetInfo(HttpServletRequest req, boolean required,
+ boolean isCreate, long dataVerId,
+ String operator, Date operateDate,
+ List<Map<String, String>> defValue,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.BROKERJSONSET, required, defValue, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> brokerJsonArray =
+ (List<Map<String, String>>) result.retData1;
+ // check and get cluster default setting info
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting();
+ if (defClusterSetting == null) {
+ if (!metaDataManager.addClusterDefSetting(sBuilder, result)) {
+ return result.isSuccess();
+ }
+ defClusterSetting = metaDataManager.getClusterDefSetting();
+ }
+ // check and get broker configure
+ HashMap<Integer, BrokerConfEntity> addedRecordMap = new HashMap<>();
+ for (int j = 0; j < brokerJsonArray.size(); j++) {
+ Map<String, String> brokerObject = brokerJsonArray.get(j);
+ // check and get operation info
+ long itemDataVerId = dataVerId;
+ String itemCreator = operator;
+ Date itemCreateDate = operateDate;
+ if (!WebParameterUtils.getAUDBaseInfo(
+ brokerObject, true, result)) {
+ return result.isSuccess();
+ }
+ Tuple3<Long, String, Date> opTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
+ if (opTupleInfo.getF0() != TBaseConstants.META_VALUE_UNDEFINED) {
+ itemDataVerId = opTupleInfo.getF0();
+ }
+ if (opTupleInfo.getF1() != null) {
+ itemCreator = opTupleInfo.getF1();
+ }
+ if (opTupleInfo.getF2() != null) {
+ itemCreateDate = opTupleInfo.getF2();
+ }
+ // get brokerIp and brokerId field
+ if (!getBrokerIpAndIdParamValue(brokerObject, sBuilder, result)) {
+ return result.isSuccess();
+ }
+ Tuple2<Integer, String> brokerIdAndIpTuple =
+ (Tuple2<Integer, String>) result.getRetData();
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERPORT,
+ false, defClusterSetting.getBrokerPort(), 1, result)) {
+ return result.isSuccess();
+ }
+ int brokerPort = (int) result.getRetData();
+ // get brokerTlsPort field
+ if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERTLSPORT,
+ false, defClusterSetting.getBrokerTLSPort(), 1, result)) {
+ return result.isSuccess();
+ }
+ int brokerTlsPort = (int) result.getRetData();
+ // get brokerWebPort field
+ if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERWEBPORT,
+ false, defClusterSetting.getBrokerWebPort(), 1, result)) {
+ return result.isSuccess();
+ }
+ int brokerWebPort = (int) result.getRetData();
+ // get regionId field
+ if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.REGIONID,
+ false, TServerConstants.BROKER_REGION_ID_DEF,
+ TServerConstants.BROKER_REGION_ID_MIN, result)) {
+ return result.isSuccess();
+ }
+ int regionId = (int) result.getRetData();
+ // get groupId field
+ if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.GROUPID,
+ false, TServerConstants.BROKER_GROUP_ID_DEF,
+ TServerConstants.BROKER_GROUP_ID_MIN, result)) {
+ return result.isSuccess();
+ }
+ int groupId = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(brokerObject,
+ defClusterSetting.getClsDefTopicProps(), result)) {
+ return result.isSuccess();
+ }
+ TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
+ // manageStatusId
+ ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
+ BrokerConfEntity entity =
+ new BrokerConfEntity(itemDataVerId, itemCreator, itemCreateDate);
+ entity.setBrokerIdAndIp(brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1());
+ entity.updModifyInfo(brokerPort, brokerTlsPort, brokerWebPort,
+ regionId, groupId, manageStatus, brokerProps);
+ addedRecordMap.put(entity.getBrokerId(), entity);
+ }
+ // check result
+ if (addedRecordMap.isEmpty()) {
+ if (isCreate) {
+ result.setFailResult(sBuilder
+ .append("Not found record in ")
+ .append(WebFieldDef.BROKERJSONSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ }
+ result.setSuccResult(addedRecordMap);
+ return result.isSuccess();
+ }
+
+ private StringBuilder buildRetInfo(List<BrokerProcessResult> retInfo,
+ StringBuilder sBuilder) {
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (BrokerProcessResult entry : retInfo) {
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
+ .append("{\"brokerIp\":\"").append(entry.getBrokerIp()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
+ }
+
+ private boolean getBrokerIpAndIdParamValue(HttpServletRequest req,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ // get brokerIp
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.BROKERIP, true, null, result)) {
+ return result.success;
+ }
+ String brokerIp = (String) result.retData1;
+ // get brokerId
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.BROKERID, true, 0, 0, result)) {
+ return result.success;
+ }
+ int brokerId = (int) result.getRetData();
+ if (brokerId <= 0) {
+ try {
+ brokerId = abs(AddressUtils.ipToInt(brokerIp));
+ } catch (Exception e) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuilder.append("Get ").append(WebFieldDef.BROKERID.name)
+ .append(" by ").append(WebFieldDef.BROKERIP.name)
+ .append(" error !, exception is :")
+ .append(e.toString()).toString());
+ return result.isSuccess();
+ }
+ }
+ BrokerConfEntity curEntity = metaDataManager.getBrokerConfByBrokerIp(brokerIp);
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuilder.append("Duplicated broker configure record, ")
+ .append("query by ").append(WebFieldDef.BROKERIP.name)
+ .append(" : ").append(brokerIp).toString());
+ return result.isSuccess();
+ }
+ curEntity = metaDataManager.getBrokerConfByBrokerId(brokerId);
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuilder.append("Duplicated broker configure record, ")
+ .append("query by ").append(WebFieldDef.BROKERID.name)
+ .append(" : ").append(brokerId).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(new Tuple2<>(brokerId, brokerIp));
+ return result.isSuccess();
+ }
+
+ private boolean getBrokerIpAndIdParamValue(Map<String, String> keyValueMap,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ // get brokerIp
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.BROKERIP, true, null, result)) {
+ return result.success;
+ }
+ String brokerIp = (String) result.retData1;
+ // get brokerId
+ if (!WebParameterUtils.getIntParamValue(keyValueMap,
+ WebFieldDef.BROKERID, true, 0, 0, result)) {
+ return result.success;
+ }
+ int brokerId = (int) result.getRetData();
+ if (brokerId <= 0) {
+ try {
+ brokerId = abs(AddressUtils.ipToInt(brokerIp));
+ } catch (Exception e) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuilder.append("Get ").append(WebFieldDef.BROKERID.name)
+ .append(" by ").append(WebFieldDef.BROKERIP.name)
+ .append(" error !, exception is :")
+ .append(e.toString()).toString());
+ return result.isSuccess();
+ }
+ }
+ BrokerConfEntity curEntity = metaDataManager.getBrokerConfByBrokerIp(brokerIp);
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuilder.append("Duplicated broker configure record, ")
+ .append("query by ").append(WebFieldDef.BROKERIP.name)
+ .append(" : ").append(brokerIp).toString());
+ return result.isSuccess();
+ }
+ curEntity = metaDataManager.getBrokerConfByBrokerId(brokerId);
+ if (curEntity != null) {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuilder.append("Duplicated broker configure record, ")
+ .append("query by ").append(WebFieldDef.BROKERID.name)
+ .append(" : ").append(brokerId).toString());
+ return result.isSuccess();
+ }
+ result.setSuccResult(new Tuple2<>(brokerId, brokerIp));
+ return result.isSuccess();
+ }
+
+
+
+
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index c4c75d8..dad52df 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -173,7 +173,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
if (totalCnt++ > 0) {
sBuilder.append(",");
}
- entity.toWebJsonStr(sBuilder, true);
+ entity.toWebJsonStr(sBuilder, true, true);
}
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 0ac83f5..f29ff0e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -145,7 +145,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
if (totalCnt++ > 0) {
sBuilder.append(",");
}
- sBuilder = entity.toWebJsonStr(sBuilder, true);
+ sBuilder = entity.toWebJsonStr(sBuilder, true, true);
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
return sBuilder;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 2eb2a4a..edeb31a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -153,7 +153,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
metaDataManager.getClusterDefSetting();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
if (defClusterSetting != null) {
- defClusterSetting.toWebJsonStr(sBuilder, true);
+ defClusterSetting.toWebJsonStr(sBuilder, true, true);
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, 1);
return sBuilder;
@@ -274,7 +274,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
}
// build return result
WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
- curConf.toWebJsonStr(sBuilder, true);
+ curConf.toWebJsonStr(sBuilder, true, true);
WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, 1);
return sBuilder;
}