You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/24 02:16:34 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-617] Add unit
tests for WebParameterUtils (#471)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 8046ff9 [INLONG-617] Add unit tests for WebParameterUtils (#471)
8046ff9 is described below
commit 8046ff95ea022c264500689a39a67609d9d1574d
Author: gosonzhang <go...@apache.org>
AuthorDate: Mon May 24 10:16:27 2021 +0800
[INLONG-617] Add unit tests for WebParameterUtils (#471)
---
resources/assets/scripts/brokerDetail.js | 7 -
resources/assets/scripts/common/module.js | 48 ++--
resources/assets/scripts/topicDetail.js | 7 -
resources/templates/screen/config/brokerDetail.vm | 3 -
resources/templates/screen/config/topicDetail.vm | 1 -
.../server/common/statusdef/ManageStatus.java | 37 ++-
.../server/common/utils/WebParameterUtils.java | 47 ++--
.../server/master/metamanage/MetaDataManager.java | 159 ++++++++----
.../metastore/BdbMetaStoreServiceImpl.java | 4 +-
.../metamanage/metastore/MetaStoreService.java | 2 +-
.../metastore/dao/entity/BaseEntity.java | 10 +-
.../metastore/dao/entity/BrokerConfEntity.java | 65 ++---
.../metastore/dao/entity/ClusterSettingEntity.java | 47 ++--
.../metastore/dao/entity/TopicDeployEntity.java | 14 +-
.../metastore/dao/entity/TopicPropGroup.java | 109 ++++++++-
.../metastore/dao/mapper/TopicDeployMapper.java | 4 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 28 ++-
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 20 +-
.../impl/bdbimpl/BdbGroupResCtrlMapperImpl.java | 18 +-
.../impl/bdbimpl/BdbTopicCtrlMapperImpl.java | 6 +-
.../impl/bdbimpl/BdbTopicDeployMapperImpl.java | 73 +++---
.../nodemanage/nodebroker/BrokerPSInfoHolder.java | 11 +-
.../nodemanage/nodebroker/BrokerRunManager.java | 2 +-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 20 +-
.../server/master/web/action/screen/Webapi.java | 6 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 8 +-
.../web/handler/WebAdminTopicAuthHandler.java | 3 +-
.../master/web/handler/WebBrokerConfHandler.java | 133 +++++-----
.../web/handler/WebGroupConsumeCtrlHandler.java | 4 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 4 +-
.../master/web/handler/WebMasterInfoHandler.java | 11 +-
.../master/web/handler/WebTopicCtrlHandler.java | 4 +-
.../master/web/handler/WebTopicDeployHandler.java | 13 +-
.../server/common/WebParameterUtilsTest.java | 269 +++++++++++++++++++++
34 files changed, 799 insertions(+), 398 deletions(-)
diff --git a/resources/assets/scripts/brokerDetail.js b/resources/assets/scripts/brokerDetail.js
index 5a0fd8a..3841b51 100644
--- a/resources/assets/scripts/brokerDetail.js
+++ b/resources/assets/scripts/brokerDetail.js
@@ -232,10 +232,6 @@
' <div class="cnt">' + dataSet.unflushThreshold + '</div>' +
' </div>' +
' <div class="row">' +
- ' <div class="tit">deleteWhen:</div>' +
- ' <div class="cnt">' + dataSet.deleteWhen + '</div>' +
- ' </div>' +
- ' <div class="row">' +
' <div class="tit">numPartitions:</div>' +
' <div class="cnt">' + dataSet.numPartitions + '</div>' +
' </div>' +
@@ -290,9 +286,6 @@
}, {
"data": "unflushInterval"
}, {
- "data": "deleteWhen",
- "orderable": false
- }, {
"data": "deletePolicy",
"orderable": false
}],
diff --git a/resources/assets/scripts/common/module.js b/resources/assets/scripts/common/module.js
index d9e52d3..90c11ea 100644
--- a/resources/assets/scripts/common/module.js
+++ b/resources/assets/scripts/common/module.js
@@ -298,7 +298,6 @@ Dialog.prototype.addBrokerInfo = function (type, brokerId, callback) {
'brokerId': '0',
'brokerIp': '',
'brokerPort': '8123',
- 'deleteWhen': '0 0 6,18 * * ?',
'deletePolicy': 'delete,168h',
'numPartitions': '3',
'unflushThreshold': '1000',
@@ -330,13 +329,6 @@ Dialog.prototype.addBrokerInfo = function (type, brokerId, callback) {
' </div>' +
' </div>' +
' <div class="row">' +
- ' <div class="tit">deleteWhen</div>' +
- ' <div class="cnt">' +
- ' <input type="text" class="m" value="'
- + data.deleteWhen + '" name="deleteWhen">' +
- ' </div>' +
- ' </div>' +
- ' <div class="row">' +
' <div class="tit">unflushThreshold</div>' +
' <div class="cnt">' +
' <input type="text" class="m" value="'
@@ -448,23 +440,23 @@ Dialog.prototype.confirmTopicInfo = function (type, topicName, selectedBrokerid,
var types = {
'delete': {
'text': '删除',
- 'api': 'admin_delete_topic_info'
+ 'api': 'admin_delete_topic_deploy_info'
},
'allowPub': {
'text': '允许可发布',
- 'api': 'admin_modify_topic_info&modifyUser=webapi&acceptPublish=true'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi&acceptPublish=true'
},
'disPub': {
'text': '禁止可发布',
- 'api': 'admin_modify_topic_info&modifyUser=webapi&acceptPublish=false'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi&acceptPublish=false'
},
'allowSub': {
'text': '允许可订阅',
- 'api': 'admin_modify_topic_info&modifyUser=webapi&acceptSubscribe=true'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi&acceptSubscribe=true'
},
'disSub': {
'text': '禁止可订阅',
- 'api': 'admin_modify_topic_info&modifyUser=webapi&acceptSubscribe=false'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi&acceptSubscribe=false'
}
};
@@ -664,16 +656,15 @@ Dialog.prototype.addTopicInfo = function (type, topicName, data) {
var types = {
'add': {
'text': '新增',
- 'api': 'admin_add_new_topic_record&createUser=webapi'
+ 'api': 'admin_add_topic_deploy_info&createUser=webapi'
},
'mod': {
'text': '修改',
- 'api': 'admin_modify_topic_info&modifyUser=webapi'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi'
}
};
data = data || {
'topicName': '',
- 'deleteWhen': '0 0 6,18 * * ?',
'deletePolicy': 'delete,168h',
'numPartitions': '3',
'unflushThreshold': '1000',
@@ -699,13 +690,6 @@ Dialog.prototype.addTopicInfo = function (type, topicName, data) {
' </div>' +
' </div>' +
' <div class="row">' +
- ' <div class="tit">deleteWhen</div>' +
- ' <div class="cnt">' +
- ' <input type="text" class="m" value="'
- + data.deleteWhen + '" name="deleteWhen">' +
- ' </div>' +
- ' </div>' +
- ' <div class="row">' +
' <div class="tit">unflushThreshold</div>' +
' <div class="cnt">' +
' <input type="text" class="m" value="'
@@ -783,11 +767,11 @@ Dialog.prototype.confirmBroker2Topic = function (type, topicName, formData) {
var types = {
'add': {
'text': '新增',
- 'api': 'admin_add_new_topic_record&createUser=webapi'
+ 'api': 'admin_add_topic_deploy_info&createUser=webapi'
},
'mod': {
'text': '修改',
- 'api': 'admin_modify_topic_info&modifyUser=webapi'
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi'
}
};
@@ -1070,12 +1054,12 @@ CheckBox.prototype.process = function (type, $target, dialogInstance, ext, callb
var types = {
'sub': {
'text': '订阅broker',
- 'api': 'admin_set_broker_read_or_write&isAcceptSubscribe=' + stateStr + '&brokerId='
+ 'api': 'admin_set_broker_read_or_write&acceptSubscribe=' + stateStr + '&brokerId='
+ ext
},
'pub': {
'text': '发布broker',
- 'api': 'admin_set_broker_read_or_write&isAcceptPublish=' + stateStr + '&brokerId=' + ext
+ 'api': 'admin_set_broker_read_or_write&acceptPublish=' + stateStr + '&brokerId=' + ext
},
'setTopicAuth': {
'text': 'topic的消费组授权控制',
@@ -1126,12 +1110,12 @@ CheckBox.prototype.processTopic = function (type, $target, dialogInstance, ext,
var types = {
'sub': {
'text': '订阅broker',
- 'api': 'admin_set_broker_read_or_write&isAcceptSubscribe=' + stateStr + '&brokerId='
+ 'api': 'admin_set_broker_read_or_write&acceptSubscribe=' + stateStr + '&brokerId='
+ ext
},
'pub': {
'text': '发布broker',
- 'api': 'admin_set_broker_read_or_write&isAcceptPublish=' + stateStr + '&brokerId=' + ext
+ 'api': 'admin_set_broker_read_or_write&acceptPublish=' + stateStr + '&brokerId=' + ext
},
'setTopicAuth': {
'text': 'topic的消费组授权控制',
@@ -1166,15 +1150,15 @@ CheckBox.prototype.processTopic = function (type, $target, dialogInstance, ext,
var types = {
'delete': {
'text': '删除',
- 'api': 'admin_delete_topic_info'
+ 'api': 'admin_delete_topic_deploy_info'
},
'pub': {
'text': '允许可发布',
- 'api': 'admin_modify_topic_info&modifyUser=webapi&acceptPublish='
+ 'api': 'admin_update_topic_deploy_info&modifyUser=webapi&acceptPublish='
},
'sub': {
'text': '删除',
- 'api': 'admin_delete_topic_info'
+ 'api': 'admin_delete_topic_deploy_info'
}
};
diff --git a/resources/assets/scripts/topicDetail.js b/resources/assets/scripts/topicDetail.js
index 563f7c3..4370e20 100644
--- a/resources/assets/scripts/topicDetail.js
+++ b/resources/assets/scripts/topicDetail.js
@@ -132,10 +132,6 @@
' <div class="cnt">' + dataSet.unflushThreshold + '</div>' +
' </div>' +
' <div class="row">' +
- ' <div class="tit">deleteWhen:</div>' +
- ' <div class="cnt">' + dataSet.deleteWhen + '</div>' +
- ' </div>' +
- ' <div class="row">' +
' <div class="tit">numPartitions:</div>' +
' <div class="cnt">' + dataSet.numPartitions + '</div>' +
' </div>' +
@@ -295,9 +291,6 @@
}, {
"data": "unflushInterval"
}, {
- "data": "deleteWhen",
- "orderable": false
- }, {
"data": "deletePolicy",
"orderable": false
}, {
diff --git a/resources/templates/screen/config/brokerDetail.vm b/resources/templates/screen/config/brokerDetail.vm
index 44e2824..e20cd87 100644
--- a/resources/templates/screen/config/brokerDetail.vm
+++ b/resources/templates/screen/config/brokerDetail.vm
@@ -104,7 +104,6 @@
<th>acceptSubscribe</th>
<th>unflushThreshold</th>
<th>unflushInterval</th>
- <th>deleteWhen</th>
<th>deletePolicy</th>
</tr>
</thead>
@@ -136,7 +135,6 @@
<th>acceptSubscribe</th>
<th>unflushThreshold</th>
<th>unflushInterval</th>
- <th>deleteWhen</th>
<th>deletePolicy</th>
</tr>
</thead>
@@ -168,7 +166,6 @@
<th>acceptSubscribe</th>
<th>unflushThreshold</th>
<th>unflushInterval</th>
- <th>deleteWhen</th>
<th>deletePolicy</th>
</tr>
</thead>
diff --git a/resources/templates/screen/config/topicDetail.vm b/resources/templates/screen/config/topicDetail.vm
index 3e1089a..f3b80e4 100644
--- a/resources/templates/screen/config/topicDetail.vm
+++ b/resources/templates/screen/config/topicDetail.vm
@@ -79,7 +79,6 @@
<th>numPartitions</th>
<th>unflushThreshold</th>
<th>unflushInterval</th>
- <th>deleteWhen</th>
<th>deletePolicy</th>
<th>acceptPublish</th>
<th>acceptSubscribe</th>
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
index 09f78be..916791a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
@@ -36,12 +36,12 @@ public enum ManageStatus {
ManageStatus(int code, String description,
- boolean isAcceptPublish,
- boolean isAcceptSubscribe) {
+ boolean acceptPublish,
+ boolean acceptSubscribe) {
this.code = code;
this.description = description;
- this.isAcceptPublish = isAcceptPublish;
- this.isAcceptSubscribe = isAcceptSubscribe;
+ this.isAcceptPublish = acceptPublish;
+ this.isAcceptSubscribe = acceptSubscribe;
}
public boolean isOnlineStatus() {
@@ -84,4 +84,33 @@ public enum ManageStatus {
"unknown broker manage status code %s", code));
}
+ public static ManageStatus getNewStatus(ManageStatus oldStatus,
+ Boolean acceptPublish,
+ Boolean acceptSubscribe) {
+ if (acceptPublish == null && acceptSubscribe == null) {
+ return oldStatus;
+ }
+ boolean newPublish = oldStatus.isAcceptPublish;
+ boolean newSubscribe = oldStatus.isAcceptSubscribe;
+ if (acceptPublish != null) {
+ newPublish = acceptPublish;
+ }
+ if (acceptSubscribe != null) {
+ newSubscribe = acceptSubscribe;
+ }
+ if (newPublish) {
+ if (newSubscribe) {
+ return ManageStatus.STATUS_MANAGE_ONLINE;
+ } else {
+ return ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ;
+ }
+ } else {
+ if (newSubscribe) {
+ return ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE;
+ } else {
+ return ManageStatus.STATUS_MANAGE_OFFLINE;
+ }
+ }
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index db93e04..b5535d8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -87,31 +87,45 @@ public class WebParameterUtils {
return sBuffer.append("],\"count\":").append(totalCnt).append("}");
}
+ /**
+ * Parse the parameter value required for add update and delete
+ *
+ * @param paramCntr parameter container object
+ * @param isAdd if add operation
+ * @param defOpEntity default value set,
+ * if not null, it must fill required field values
+ * @param sBuffer string buffer
+ * @param result process result of parameter value
+ * @return the process result
+ */
public static <T> boolean getAUDBaseInfo(T paramCntr, boolean isAdd,
BaseEntity defOpEntity,
StringBuilder sBuffer,
ProcessResult result) {
// check and get data version id
if (!WebParameterUtils.getLongParamValue(paramCntr, WebFieldDef.DATAVERSIONID,
- false, TBaseConstants.META_VALUE_UNDEFINED, sBuffer, result)) {
+ false, (defOpEntity == null ?
+ TBaseConstants.META_VALUE_UNDEFINED : defOpEntity.getDataVerId()),
+ sBuffer, result)) {
return result.isSuccess();
}
long dataVerId = (long) result.getRetData();
// check and get createUser or modifyUser
- String createUsr = "";
- Date createDate = null;
+ String createUsr = null;
+ Date createDate = new Date();
if (isAdd) {
// check create user field
if (!WebParameterUtils.getStringParamValue(paramCntr, WebFieldDef.CREATEUSER,
- (defOpEntity == null && isAdd),
- (defOpEntity == null ? null : defOpEntity.getCreateUser()),
+ defOpEntity == null,
+ (defOpEntity == null ? createUsr : defOpEntity.getCreateUser()),
sBuffer, result)) {
return result.isSuccess();
}
createUsr = (String) result.getRetData();
// check and get create date
if (!WebParameterUtils.getDateParameter(paramCntr, WebFieldDef.CREATEDATE, false,
- (defOpEntity == null ? new Date() : defOpEntity.getCreateDate()),
+ ((defOpEntity == null || defOpEntity.getCreateDate() == null) ?
+ createDate : defOpEntity.getCreateDate()),
sBuffer, result)) {
return result.isSuccess();
}
@@ -127,7 +141,8 @@ public class WebParameterUtils {
String modifyUser = (String) result.getRetData();
// check and get modify date
if (!WebParameterUtils.getDateParameter(paramCntr, WebFieldDef.MODIFYDATE, false,
- (defOpEntity == null ? createDate : defOpEntity.getModifyDate()),
+ ((defOpEntity == null || defOpEntity.getModifyDate() == null) ?
+ createDate : defOpEntity.getModifyDate()),
sBuffer, result)) {
return result.isSuccess();
}
@@ -315,7 +330,7 @@ public class WebParameterUtils {
ProcessResult result) {
// get topicStatusId field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICSTATUSID,
- isRequired, defVal.getCode(), TopicStatus.STATUS_TOPIC_OK.getCode(),
+ isRequired, defVal.getCode(), TopicStatus.STATUS_TOPIC_UNDEFINED.getCode(),
sBuffer, result)) {
return result.isSuccess();
}
@@ -326,7 +341,7 @@ public class WebParameterUtils {
} catch (Throwable e) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
sBuffer.append("The value of field ")
- .append(WebFieldDef.TOPICSTATUSID.name())
+ .append(WebFieldDef.TOPICSTATUSID.name)
.append(" invalid:").append(e.getMessage()).toString());
sBuffer.delete(0, sBuffer.length());
}
@@ -680,7 +695,7 @@ public class WebParameterUtils {
if (TStringUtils.isBlank(paramValue)) {
if (required) {
result.setFailResult(sBuffer.append("Parameter ").append(fieldDef.name)
- .append(" is missing or value is null or blank!").toString());
+ .append(" is missing or value is blank!").toString());
sBuffer.delete(0, sBuffer.length());
} else {
procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
@@ -705,7 +720,7 @@ public class WebParameterUtils {
if (valItemSet.isEmpty()) {
if (required) {
result.setFailResult(sBuffer.append("Parameter ").append(fieldDef.name)
- .append(" is missing or value is null or blank!").toString());
+ .append(" is missing or value is blank!").toString());
sBuffer.delete(0, sBuffer.length());
} else {
procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
@@ -883,7 +898,7 @@ public class WebParameterUtils {
if (required) {
result.setFailResult(new StringBuilder(512)
.append("Parameter ").append(fieldDef.name)
- .append(" is missing or value is null or blank!").toString());
+ .append(" is missing or value is blank!").toString());
} else {
result.setSuccResult(defValue);
}
@@ -959,7 +974,7 @@ public class WebParameterUtils {
if (required) {
result.setFailResult(new StringBuilder(512)
.append("Parameter ").append(fieldDef.name)
- .append(" is missing or value is null or blank!").toString());
+ .append(" is missing or value is blank!").toString());
} else {
result.setSuccResult(defValue);
}
@@ -1035,7 +1050,9 @@ public class WebParameterUtils {
result.setSuccResult(date);
} catch (Throwable e) {
result.setFailResult(sBuffer.append("Parameter ").append(fieldDef.name)
- .append(" parse error: ").append(e.getMessage()).toString());
+ .append("'s value ").append(paramValue)
+ .append(" parse error, required value format is ")
+ .append(TBaseConstants.META_TMP_DATE_VALUE).toString());
sBuffer.delete(0, sBuffer.length());
}
return result.isSuccess();
@@ -1060,7 +1077,7 @@ public class WebParameterUtils {
String paramValue = (String) result.getRetData();
if (paramValue != null) {
if (!paramValue.equals(master.getMasterConfig().getConfModAuthToken())) {
- result.setFailResult("Illegal access, unauthorized request!");
+ result.setFailResult("illegal access, unauthorized request!");
}
}
return result.isSuccess();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index 3e1ebc8..9d9ed13 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -430,7 +430,7 @@ public class MetaDataManager implements Server {
} else {
BrokerConfEntity newEntity = curEntity.clone();
newEntity.updBaseModifyInfo(entity);
- if (entity.updModifyInfo(entity.getDataVerId(), entity.getBrokerPort(),
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.getBrokerPort(),
entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
entity.getRegionId(), entity.getGroupId(),
entity.getManageStatus(), entity.getTopicProps())) {
@@ -545,6 +545,64 @@ public class MetaDataManager implements Server {
}
/**
+ * Change broker read write status
+ *
+ * @param opEntity operator
+ * @param brokerIdSet need deleted broker id set
+ * @param rdWtTpl need changed read or write status
+ * @param sBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public List<BrokerProcessResult> changeBrokerRWStatus(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
+ Tuple2<Boolean, Boolean> rdWtTpl,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ BrokerConfEntity curEntry;
+ BrokerConfEntity newEntry;
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ // check target broker configure's status
+ for (Integer brokerId : brokerIdSet) {
+ curEntry = metaStoreService.getBrokerConfByBrokerId(brokerId);
+ if (curEntry == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ "The broker configure not exist!");
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ if (curEntry.getManageStatus().getCode()
+ < ManageStatus.STATUS_MANAGE_ONLINE.getCode()) {
+ result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
+ "The broker configure under draft status, please online first!");
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ ManageStatus newMngStatus = ManageStatus.getNewStatus(
+ curEntry.getManageStatus(), rdWtTpl.getF0(), rdWtTpl.getF1());
+ if (curEntry.getManageStatus() == newMngStatus) {
+ result.setSuccResult(null);
+ retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
+ continue;
+ }
+ newEntry = curEntry.clone();
+ newEntry.updBaseModifyInfo(opEntity);
+ if (newEntry.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
+ if (metaStoreService.updBrokerConf(newEntry, sBuffer, result)) {
+ triggerBrokerConfDataSync(newEntry.getBrokerId(), sBuffer, result);
+ }
+ } else {
+ result.setSuccResult(null);
+ }
+ retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
+ }
+ return retInfo;
+ }
+
+ /**
* Change broker configure status
*
* @param opEntity operator
@@ -571,7 +629,7 @@ public class MetaDataManager implements Server {
if (!curEntry.getManageStatus().isOnlineStatus()) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
sBuffer.append("The broker manage status by brokerId=").append(brokerId)
- .append(" not in online status, can't reload this configure! ")
+ .append(" is not in online status, can't reload this configure! ")
.toString());
sBuffer.delete(0, sBuffer.length());
retInfo.add(new BrokerProcessResult(brokerId, "", result));
@@ -599,11 +657,7 @@ public class MetaDataManager implements Server {
ProcessResult result) {
List<BrokerProcessResult> retInfo = new ArrayList<>();
for (int brokerId : brokerIdSet) {
- // check broker status
- if (!isAllowDeleteBrokerConf(brokerId, rsvData, sBuffer, result)) {
- retInfo.add(new BrokerProcessResult(brokerId, "", result));
- continue;
- }
+ // get broker configure
BrokerConfEntity entity =
metaStoreService.getBrokerConfByBrokerId(brokerId);
if (entity == null) {
@@ -611,6 +665,30 @@ public class MetaDataManager implements Server {
retInfo.add(new BrokerProcessResult(brokerId, "", result));
continue;
}
+ // check broker's manage status
+ if (entity.getManageStatus().isOnlineStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ "Broker manage status is online, please offline first!");
+ retInfo.add(new BrokerProcessResult(brokerId, entity.getBrokerIp(), result));
+ continue;
+ }
+ BrokerRunManager brokerRunManager = tMaster.getBrokerRunManager();
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runStatusInfo != null
+ && entity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
+ && runStatusInfo.inProcessingStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("Illegal value: the broker is processing offline event by brokerId=")
+ .append(brokerId).append(", please offline first and try later!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ retInfo.add(new BrokerProcessResult(brokerId, entity.getBrokerIp(), result));
+ continue;
+ }
+ if (!chkBrokerTopicConfAllowed(brokerId, rsvData, sBuffer, result)) {
+ retInfo.add(new BrokerProcessResult(brokerId, entity.getBrokerIp(), result));
+ continue;
+ }
delBrokerConfig(operator, entity.getBrokerId(), rsvData, sBuffer, result);
retInfo.add(new BrokerProcessResult(entity.getBrokerId(),
entity.getBrokerIp(), result));
@@ -618,32 +696,8 @@ public class MetaDataManager implements Server {
return retInfo;
}
- private boolean isAllowDeleteBrokerConf(int brokerId, boolean rsvData,
- StringBuilder sBuffer, ProcessResult result) {
- BrokerConfEntity entity =
- metaStoreService.getBrokerConfByBrokerId(brokerId);
- if (entity == null) {
- result.setSuccResult(null);
- return result.isSuccess();
- }
- // check broker's manage status
- if (entity.getManageStatus().isOnlineStatus()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- "Broker manage status is online, please offline first!");
- return result.isSuccess();
- }
- BrokerRunManager brokerRunManager = tMaster.getBrokerRunManager();
- BrokerRunStatusInfo runStatusInfo =
- brokerRunManager.getBrokerRunStatusInfo(brokerId);
- if (runStatusInfo != null
- && entity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
- && runStatusInfo.inProcessingStatus()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuffer.append("Illegal value: the broker is processing offline event by brokerId=")
- .append(brokerId).append(", please offline first and try later!").toString());
- sBuffer.delete(0, sBuffer.length());
- return result.isSuccess();
- }
+ private boolean chkBrokerTopicConfAllowed(int brokerId, boolean rsvData,
+ StringBuilder sBuffer, ProcessResult result) {
// check broker's topic configures
Map<String, TopicDeployEntity> topiConfMap =
metaStoreService.getConfiguredTopicInfo(brokerId);
@@ -667,14 +721,13 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
}
- } else {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuffer.append("Topic configure of broker by brokerId=").append(brokerId)
- .append(" not deleted, please delete broker's topic configure first!").toString());
- sBuffer.delete(0, sBuffer.length());
+ result.setSuccResult(null);
return result.isSuccess();
}
- result.setSuccResult(null);
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("The topic configure(s) of broker by brokerId=").append(brokerId)
+ .append(" not deleted, please delete topic configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
@@ -708,10 +761,10 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
- BrokerRunStatusInfo runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
- if (runStatusInfo != null) {
+ BrokerRunStatusInfo runInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runInfo != null) {
if ((curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
- && runStatusInfo.inProcessingStatus())) {
+ && runInfo.inProcessingStatus())) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
"Broker is processing offline event, please wait and try later!");
return result.isSuccess();
@@ -719,7 +772,9 @@ public class MetaDataManager implements Server {
}
if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
brokerRunManager.delBrokerStaticInfo(brokerId);
- brokerRunManager.releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
+ if (runInfo != null) {
+ brokerRunManager.releaseBrokerRunInfo(brokerId, runInfo.getCreateId(), false);
+ }
}
return result.isSuccess();
}
@@ -884,12 +939,20 @@ public class MetaDataManager implements Server {
sBuffer.delete(0, sBuffer.length());
return new TopicProcessResult(brokerId, "", result);
}
+ TopicPropGroup newProps;
+ if (isAddOp) {
+ TopicPropGroup defProps = brokerConf.getTopicProps();
+ newProps = defProps.clone();
+ newProps.updModifyInfo(topicPropInfo);
+ } else {
+ newProps = topicPropInfo;
+ }
TopicDeployEntity deployConf =
new TopicDeployEntity(opEntity, brokerId, topicName);
deployConf.setTopicProps(brokerConf.getTopicProps());
deployConf.updModifyInfo(opEntity.getDataVerId(),
TBaseConstants.META_VALUE_UNDEFINED, brokerConf.getBrokerPort(),
- brokerConf.getBrokerIp(), deployStatus, topicPropInfo);
+ brokerConf.getBrokerIp(), deployStatus, newProps);
return addOrUpdTopicDeployInfo(isAddOp, deployConf, sBuffer, result);
}
@@ -1121,9 +1184,11 @@ public class MetaDataManager implements Server {
*
* @return topic entity map
*/
- public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
- Set<Integer> brokerIdSet) {
- return metaStoreService.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
+ Set<String> topicNameSet) {
+ Map<Integer, BrokerConfEntity> qryBrokerInfoMap =
+ metaStoreService.getBrokerConfInfo(brokerIdSet, null, null);
+ return metaStoreService.getTopicDeployInfoMap(qryBrokerInfoMap.keySet(), topicNameSet);
}
public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 97b79c2..1a5dff3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -560,8 +560,8 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
@Override
public Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<String> topicNameSet, Set<Integer> brokerIdSet) {
- return topicDeployMapper.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ Set<Integer> brokerIdSet, Set<String> topicNameSet) {
+ return topicDeployMapper.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 21569c3..2e961d3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -166,7 +166,7 @@ public interface MetaStoreService extends KeepAlive, Server {
Set<String> topicNameSet, Set<Integer> brokerIdSet, TopicDeployEntity qryEntity);
Map<Integer/* brokerId */, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<String> topicNameSet, Set<Integer> brokerIdSet);
+ Set<Integer> brokerIdSet, Set<String> topicNameSet);
Map<String/* topicName */, List<TopicDeployEntity>> getTopicDepInfoByTopicBrokerId(
Set<String> topicSet, Set<Integer> brokerIdSet);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 8217bb7..6ee733a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -32,7 +32,7 @@ import org.apache.tubemq.server.common.utils.WebParameterUtils;
public class BaseEntity implements Serializable, Cloneable {
private long dataVersionId =
- TServerConstants.DEFAULT_DATA_VERSION; // 0: default version, other: version
+ TBaseConstants.META_VALUE_UNDEFINED; // -2: undefined, other: version
private long serialId = TBaseConstants.META_VALUE_UNDEFINED;
private String createUser = ""; // create user
private Date createDate = null; // create date
@@ -266,15 +266,15 @@ public class BaseEntity implements Serializable, Cloneable {
.append(",\"createUser\":\"").append(createUser).append("\"")
.append(",\"createDate\":\"").append(createDateStr).append("\"")
.append(",\"modifyUser\":\"").append(modifyUser).append("\"")
- .append(",\"modifyDate\":\"").append(modifyDateStr).append("\"")
- .append(",\"attributes\":\"").append(attributes).append("\"");
+ .append(",\"modifyDate\":\"").append(modifyDateStr).append("\"");
+ //.append(",\"attributes\":\"").append(attributes).append("\"");
} else {
sBuilder.append(",\"dVerId\":").append(dataVersionId)
.append(",\"cur\":\"").append(createUser).append("\"")
.append(",\"cDate\":\"").append(createDateStr).append("\"")
.append(",\"mur\":\"").append(modifyUser).append("\"")
- .append(",\"mDate\":\"").append(modifyDateStr).append("\"")
- .append(",\"attrs\":\"").append(attributes).append("\"");
+ .append(",\"mDate\":\"").append(modifyDateStr).append("\"");
+ //.append(",\"attrs\":\"").append(attributes).append("\"");
}
return sBuilder;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 4f02d23..5dfa7f0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -42,8 +42,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
// broker web port
private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
private ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
- private boolean isConfDataUpdated = false; //conf data update flag
- private boolean isBrokerLoaded = false; //broker conf load flag
private int regionId = TBaseConstants.META_VALUE_UNDEFINED;
private int groupId = TBaseConstants.META_VALUE_UNDEFINED;
private TopicPropGroup topicProps = new TopicPropGroup();
@@ -68,7 +66,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
public BrokerConfEntity(int brokerId, String brokerIp, int brokerPort,
int brokerTLSPort, int brokerWebPort, ManageStatus manageStatus,
int regionId, int groupId, TopicPropGroup defTopicProps,
- boolean isConfDataUpdated, boolean isBrokerLoaded,
long dataVersionId, String createUser,
Date createDate, String modifyUser, Date modifyDate) {
super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
@@ -78,8 +75,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
this.brokerWebPort = brokerWebPort;
this.topicProps = defTopicProps;
this.manageStatus = manageStatus;
- this.isConfDataUpdated = isConfDataUpdated;
- this.isBrokerLoaded = isBrokerLoaded;
}
public BrokerConfEntity(BdbBrokerConfEntity bdbEntity) {
@@ -100,8 +95,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
bdbEntity.getDftDeletePolicy(), bdbEntity.getDataStoreType(),
bdbEntity.getDataPath());
this.manageStatus = ManageStatus.valueOf(bdbEntity.getManageStatus());
- this.isConfDataUpdated = bdbEntity.isConfDataUpdated();
- this.isBrokerLoaded = bdbEntity.isBrokerLoaded();
setAttributes(bdbEntity.getAttributes());
}
@@ -111,8 +104,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
topicProps.getNumPartitions(), topicProps.getUnflushThreshold(),
topicProps.getUnflushInterval(), "", topicProps.getDeletePolicy(),
manageStatus.getCode(), topicProps.isAcceptPublish(),
- topicProps.isAcceptSubscribe(), getAttributes(), isConfDataUpdated,
- isBrokerLoaded, getCreateUser(), getCreateDate(),
+ topicProps.isAcceptSubscribe(), getAttributes(), true,
+ false, getCreateUser(), getCreateDate(),
getModifyUser(), getModifyDate());
bdbEntity.setDataVerId(getDataVerId());
bdbEntity.setRegionId(regionId);
@@ -153,24 +146,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
this.manageStatus = manageStatus;
}
- public void setConfDataUpdated() {
- this.isBrokerLoaded = false;
- this.isConfDataUpdated = true;
- }
-
- public void setBrokerLoaded() {
- this.isBrokerLoaded = true;
- this.isConfDataUpdated = false;
- }
-
- public boolean isConfDataUpdated() {
- return this.isConfDataUpdated;
- }
-
- public boolean isBrokerLoaded() {
- return this.isBrokerLoaded;
- }
-
public void setBrokerIpAndPort(String brokerIp, int brokerPort) {
this.brokerPort = brokerPort;
this.brokerIp = brokerIp;
@@ -375,7 +350,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
|| (target.getManageStatus() != ManageStatus.STATUS_MANAGE_UNDEFINED
&& target.getManageStatus() != this.manageStatus)
|| (target.getBrokerWebPort() != TBaseConstants.META_VALUE_UNDEFINED
- && target.getBrokerWebPort() != this.brokerWebPort)) {
+ && target.getBrokerWebPort() != this.brokerWebPort)
+ || !this.topicProps.isMatched(target.getTopicProps())) {
return false;
}
return true;
@@ -390,8 +366,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
* @return
*/
public StringBuilder toWebJsonStr(StringBuilder sBuffer,
- boolean isLongName,
- boolean fullFormat) {
+ boolean isConfUpdated, boolean isConfLoaded,
+ boolean isLongName, boolean fullFormat) {
if (isLongName) {
sBuffer.append("{\"brokerId\":").append(brokerId)
.append(",\"brokerIp\":\"").append(brokerIp).append("\"")
@@ -401,8 +377,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
.append(",\"manageStatus\":\"").append(manageStatus.getDescription()).append("\"")
.append(",\"acceptPublish\":").append(manageStatus.isAcceptPublish())
.append(",\"acceptSubscribe\":").append(manageStatus.isAcceptSubscribe())
- .append(",\"isConfChanged\":").append(isConfDataUpdated)
- .append(",\"isConfLoaded\":").append(isBrokerLoaded)
+ .append(",\"isConfChanged\":").append(isConfUpdated)
+ .append(",\"isConfLoaded\":").append(isConfLoaded)
.append(",\"regionId\":").append(regionId)
.append(",\"groupId\":").append(groupId);
} else {
@@ -414,8 +390,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
.append(",\"mSts\":\"").append(manageStatus.getDescription()).append("\"")
.append(",\"accPub\":").append(manageStatus.isAcceptPublish())
.append(",\"accSub\":").append(manageStatus.isAcceptSubscribe())
- .append(",\"isConfChg\":").append(isConfDataUpdated)
- .append(",\"isConfLd\":").append(isBrokerLoaded)
+ .append(",\"isConfChg\":").append(isConfUpdated)
+ .append(",\"isConfLd\":").append(isConfLoaded)
.append(",\"rId\":").append(regionId)
.append(",\"gId\":").append(groupId);
}
@@ -459,8 +435,6 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
&& brokerPort == other.brokerPort
&& brokerTLSPort == other.brokerTLSPort
&& brokerWebPort == other.brokerWebPort
- && isConfDataUpdated == other.isConfDataUpdated
- && isBrokerLoaded == other.isBrokerLoaded
&& regionId == other.regionId
&& groupId == other.groupId
&& Objects.equals(brokerIp, other.brokerIp)
@@ -491,23 +465,18 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), brokerId, brokerIp, brokerPort, brokerTLSPort,
- brokerWebPort, manageStatus, isConfDataUpdated, isBrokerLoaded, regionId,
- groupId, topicProps, brokerAddress, brokerFullInfo, brokerSimpleInfo,
- brokerTLSSimpleInfo, brokerTLSFullInfo);
+ brokerWebPort, manageStatus, regionId, groupId, topicProps, brokerAddress,
+ brokerFullInfo, brokerSimpleInfo, brokerTLSSimpleInfo, brokerTLSFullInfo);
}
@Override
public BrokerConfEntity clone() {
- try {
- BrokerConfEntity copy = (BrokerConfEntity) super.clone();
- copy.setManageStatus(getManageStatus());
- if (copy.getTopicProps() != null) {
- copy.setTopicProps(getTopicProps().clone());
- }
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
+ BrokerConfEntity copy = (BrokerConfEntity) super.clone();
+ copy.setManageStatus(getManageStatus());
+ if (copy.getTopicProps() != null) {
+ copy.setTopicProps(getTopicProps().clone());
}
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index fe2b6b6..fb70bb0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -64,24 +64,23 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
// Constructor by BdbClusterSettingEntity
public ClusterSettingEntity(BdbClusterSettingEntity bdbEntity) {
- super(bdbEntity.getConfigId(), bdbEntity.getModifyUser(), bdbEntity.getModifyDate());
- this.brokerPort = bdbEntity.getBrokerPort();
- this.brokerTLSPort = bdbEntity.getBrokerTLSPort();
- this.brokerWebPort = bdbEntity.getBrokerWebPort();
- this.clsDefTopicProps =
- new TopicPropGroup(bdbEntity.getNumTopicStores(), bdbEntity.getNumPartitions(),
- bdbEntity.getUnflushThreshold(), bdbEntity.getUnflushInterval(),
- bdbEntity.getUnflushDataHold(), bdbEntity.getMemCacheMsgSizeInMB(),
- bdbEntity.getMemCacheMsgCntInK(), bdbEntity.getMemCacheFlushIntvl(),
- bdbEntity.isAcceptPublish(), bdbEntity.isAcceptSubscribe(),
- bdbEntity.getDeletePolicy(), bdbEntity.getDefDataType(),
- bdbEntity.getDefDataPath());
- this.maxMsgSizeInB = bdbEntity.getMaxMsgSizeInB();
- this.maxMsgSizeInMB =
- this.maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE;
- this.qryPriorityId = bdbEntity.getQryPriorityId();
- setEnableFlowCtrl(bdbEntity.getEnableGloFlowCtrl());
- setGloFlowCtrlInfo(bdbEntity.getGloFlowCtrlCnt(), bdbEntity.getGloFlowCtrlInfo());
+ super(bdbEntity.getModifyUser(), bdbEntity.getModifyDate(),
+ bdbEntity.getModifyUser(), bdbEntity.getModifyDate());
+ fillDefaultValue();
+ TopicPropGroup defTopicProps =
+ new TopicPropGroup(bdbEntity.getNumTopicStores(),
+ bdbEntity.getNumPartitions(), bdbEntity.getUnflushThreshold(),
+ bdbEntity.getUnflushInterval(), bdbEntity.getUnflushDataHold(),
+ bdbEntity.getMemCacheMsgSizeInMB(), bdbEntity.getMemCacheMsgCntInK(),
+ bdbEntity.getMemCacheFlushIntvl(), bdbEntity.isAcceptPublish(),
+ bdbEntity.isAcceptSubscribe(), bdbEntity.getDeletePolicy(),
+ bdbEntity.getDefDataType(), bdbEntity.getDefDataPath());
+ updModifyInfo(bdbEntity.getConfigId(), bdbEntity.getBrokerPort(),
+ bdbEntity.getBrokerTLSPort(), bdbEntity.getBrokerWebPort(),
+ (bdbEntity.getMaxMsgSizeInB() / TBaseConstants.META_MB_UNIT_SIZE),
+ bdbEntity.getQryPriorityId(), bdbEntity.getEnableGloFlowCtrl(),
+ bdbEntity.getGloFlowCtrlCnt(), bdbEntity.getGloFlowCtrlInfo(),
+ defTopicProps);
setAttributes(bdbEntity.getAttributes());
}
@@ -353,14 +352,10 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
@Override
public ClusterSettingEntity clone() {
- try {
- ClusterSettingEntity copy = (ClusterSettingEntity) super.clone();
- copy.setGloFlowCtrlStatus(getGloFlowCtrlStatus());
- copy.setClsDefTopicProps(getClsDefTopicProps().clone());
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
- }
+ ClusterSettingEntity copy = (ClusterSettingEntity) super.clone();
+ copy.setGloFlowCtrlStatus(getGloFlowCtrlStatus());
+ copy.setClsDefTopicProps(getClsDefTopicProps().clone());
+ return copy;
}
private void setGloFlowCtrlStatus(EnableStatus gloFlowCtrlStatus) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 7d879d2..9c1a678 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -361,15 +361,11 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable {
@Override
public TopicDeployEntity clone() {
- try {
- TopicDeployEntity copy = (TopicDeployEntity) super.clone();
- if (copy.topicProps != null) {
- copy.topicProps = getTopicProps().clone();
- }
- copy.setDeployStatus(getDeployStatus());
- return copy;
- } catch (CloneNotSupportedException e) {
- return null;
+ TopicDeployEntity copy = (TopicDeployEntity) super.clone();
+ if (copy.topicProps != null) {
+ copy.topicProps = getTopicProps().clone();
}
+ copy.setDeployStatus(getDeployStatus());
+ return copy;
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
index 30f930d..63c7eaa 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
@@ -163,10 +163,12 @@ public class TopicPropGroup implements Serializable, Cloneable {
}
public void setDeletePolicy(String deletePolicy) {
- this.deletePolicy = deletePolicy;
- Tuple2<CuPolType, Long> parsedRet = parseDelPolicy(deletePolicy);
- this.fileCuPolicyType = parsedRet.getF0();
- this.retPeriodInMs = parsedRet.getF1();
+ if (TStringUtils.isNotBlank(deletePolicy)) {
+ this.deletePolicy = deletePolicy;
+ Tuple2<CuPolType, Long> parsedRet = parseDelPolicy(deletePolicy);
+ this.fileCuPolicyType = parsedRet.getF0();
+ this.retPeriodInMs = parsedRet.getF1();
+ }
}
public String getDeletePolicy() {
@@ -238,7 +240,7 @@ public class TopicPropGroup implements Serializable, Cloneable {
* @param sBuilder
* @return
*/
- StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
if (isLongName) {
sBuilder.append(",\"numTopicStores\":").append(numTopicStores)
.append(",\"numPartitions\":").append(numPartitions)
@@ -328,6 +330,95 @@ public class TopicPropGroup implements Serializable, Cloneable {
&& fileCuPolicyType == other.fileCuPolicyType;
}
+ /**
+ * update subclass field values
+ *
+ * @return if changed
+ */
+ public boolean updModifyInfo(TopicPropGroup other) {
+ boolean changed = false;
+ // check and set numTopicStores info
+ if (other.numTopicStores != TBaseConstants.META_VALUE_UNDEFINED
+ && this.numTopicStores != other.numTopicStores) {
+ changed = true;
+ this.numTopicStores = other.numTopicStores;
+ }
+ // check and set numPartitions info
+ if (other.numPartitions != TBaseConstants.META_VALUE_UNDEFINED
+ && this.numPartitions != other.numPartitions) {
+ changed = true;
+ this.numPartitions = other.numPartitions;
+ }
+ // check and set unflushThreshold info
+ if (other.unflushThreshold != TBaseConstants.META_VALUE_UNDEFINED
+ && this.unflushThreshold != other.unflushThreshold) {
+ changed = true;
+ this.unflushThreshold = other.unflushThreshold;
+ }
+ // check and set unflushInterval info
+ if (other.unflushInterval != TBaseConstants.META_VALUE_UNDEFINED
+ && this.unflushInterval != other.unflushInterval) {
+ changed = true;
+ this.unflushInterval = other.unflushInterval;
+ }
+ // check and set unflushInterval info
+ if (other.unflushDataHold != TBaseConstants.META_VALUE_UNDEFINED
+ && this.unflushDataHold != other.unflushDataHold) {
+ changed = true;
+ this.unflushDataHold = other.unflushDataHold;
+ }
+ // check and set memCacheMsgSizeInMB info
+ if (other.memCacheMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED
+ && this.memCacheMsgSizeInMB != other.memCacheMsgSizeInMB) {
+ changed = true;
+ this.memCacheMsgSizeInMB = other.memCacheMsgSizeInMB;
+ }
+ // check and set memCacheMsgCntInK info
+ if (other.memCacheMsgCntInK != TBaseConstants.META_VALUE_UNDEFINED
+ && this.memCacheMsgCntInK != other.memCacheMsgCntInK) {
+ changed = true;
+ this.memCacheMsgCntInK = other.memCacheMsgCntInK;
+ }
+ // check and set memCacheFlushIntvl info
+ if (other.memCacheFlushIntvl != TBaseConstants.META_VALUE_UNDEFINED
+ && this.memCacheFlushIntvl != other.memCacheFlushIntvl) {
+ changed = true;
+ this.memCacheFlushIntvl = other.memCacheFlushIntvl;
+ }
+ // check and set acceptPublish info
+ if (other.acceptPublish != null
+ && !Objects.equals(this.acceptPublish, other.acceptPublish)) {
+ changed = true;
+ this.acceptPublish = other.acceptPublish;
+ }
+ // check and set acceptSubscribe info
+ if (other.acceptSubscribe != null
+ && !Objects.equals(this.acceptSubscribe, other.acceptSubscribe)) {
+ changed = true;
+ this.acceptSubscribe = other.acceptSubscribe;
+ }
+ // check and set dataStoreType info
+ if (other.dataStoreType != TBaseConstants.META_VALUE_UNDEFINED
+ && this.dataStoreType != other.dataStoreType) {
+ changed = true;
+ this.dataStoreType = other.dataStoreType;
+ }
+ // check and set filterCondStr info
+ if (TStringUtils.isNotBlank(other.dataPath)
+ && !Objects.equals(this.dataPath, other.dataPath)) {
+ changed = true;
+ this.dataPath = other.dataPath;
+ }
+ // check and set deletePolicy info
+ if (TStringUtils.isNotBlank(other.deletePolicy)
+ && !Objects.equals(this.deletePolicy, other.deletePolicy)) {
+ changed = true;
+ setDeletePolicy(other.deletePolicy);
+ }
+ return changed;
+ }
+
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -354,8 +445,12 @@ public class TopicPropGroup implements Serializable, Cloneable {
}
@Override
- public TopicPropGroup clone() throws CloneNotSupportedException {
- return (TopicPropGroup) super.clone();
+ public TopicPropGroup clone() {
+ try {
+ return (TopicPropGroup) super.clone();
+ } catch (Throwable e) {
+ return null;
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 45300fe..6328c46 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -59,8 +59,8 @@ public interface TopicDeployMapper extends AbstractMapper {
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity);
- Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNameSet,
- Set<Integer> brokerIdSet);
+ Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<Integer> brokerIdSet,
+ Set<String> topicNameSet);
Map<String/* topicName */, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(
Set<String> topicSet, Set<Integer> brokerIdSet);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index ad6afa0..b3bfdc2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -192,7 +192,7 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
} else {
for (BrokerConfEntity entity : brokerConfCache.values()) {
- if (entity.isMatched(qryEntity)) {
+ if (entity != null && entity.isMatched(qryEntity)) {
retMap.put(entity.getBrokerId(), entity);
}
}
@@ -208,35 +208,39 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
Set<String> brokerIpSet,
BrokerConfEntity qryEntity) {
- Set<Integer> qryBrokerKey = null;
+ Set<Integer> qryBrokerKeySet = null;
Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
- qryBrokerKey = new HashSet<>(brokerIdSet);
+ qryBrokerKeySet = new HashSet<>(brokerIdSet);
}
if (brokerIpSet != null && !brokerIpSet.isEmpty()) {
- if (qryBrokerKey == null) {
- qryBrokerKey = new HashSet<>();
+ if (qryBrokerKeySet == null) {
+ qryBrokerKeySet = new HashSet<>();
}
for (String brokerIp : brokerIpSet) {
Integer brokerId = brokerIpIndexCache.get(brokerIp);
if (brokerId != null) {
- qryBrokerKey.add(brokerId);
+ qryBrokerKeySet.add(brokerId);
}
}
}
// get broker configures
- if (qryBrokerKey == null) {
+ if (qryBrokerKeySet == null) {
for (BrokerConfEntity entity : brokerConfCache.values()) {
- if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
- retMap.put(entity.getBrokerId(), entity);
+ if (entity == null
+ || (qryEntity != null && !entity.isMatched(qryEntity))) {
+ continue;
}
+ retMap.put(entity.getBrokerId(), entity);
}
} else {
- for (Integer brokerId : qryBrokerKey) {
+ for (Integer brokerId : qryBrokerKeySet) {
BrokerConfEntity entity = brokerConfCache.get(brokerId);
- if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
- retMap.put(entity.getBrokerId(), entity);
+ if (entity == null
+ || (qryEntity != null && !entity.isMatched(qryEntity))) {
+ continue;
}
+ retMap.put(entity.getBrokerId(), entity);
}
}
return retMap;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 6b9f732..afeb5d4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -330,20 +330,22 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
List<GroupConsumeCtrlEntity> itemLst;
if (qryHitRecSet == null) {
for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
- if (entity != null && entity.isMatched(qryEntry)) {
- itemLst = retEntityMap.computeIfAbsent(
- entity.getGroupName(), k -> new ArrayList<>());
- itemLst.add(entity);
+ if (entity == null || (qryEntry != null && !entity.isMatched(qryEntry))) {
+ continue;
}
+ itemLst = retEntityMap.computeIfAbsent(
+ entity.getGroupName(), k -> new ArrayList<>());
+ itemLst.add(entity);
}
} else {
for (String recKey : qryHitRecSet) {
tmpEntity = grpConsumeCtrlCache.get(recKey);
- if (tmpEntity != null && tmpEntity.isMatched(qryEntry)) {
- itemLst = retEntityMap.computeIfAbsent(
- tmpEntity.getGroupName(), k -> new ArrayList<>());
- itemLst.add(tmpEntity);
+ if (tmpEntity == null || (qryEntry != null && !tmpEntity.isMatched(qryEntry))) {
+ continue;
}
+ itemLst = retEntityMap.computeIfAbsent(
+ tmpEntity.getGroupName(), k -> new ArrayList<>());
+ itemLst.add(tmpEntity);
}
}
return retEntityMap;
@@ -356,7 +358,7 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
retEntitys.addAll(grpConsumeCtrlCache.values());
} else {
for (GroupConsumeCtrlEntity entity : grpConsumeCtrlCache.values()) {
- if (entity.isMatched(qryEntity)) {
+ if (entity != null && entity.isMatched(qryEntity)) {
retEntitys.add(entity);
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
index 6c61370..42766fd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupResCtrlMapperImpl.java
@@ -165,21 +165,23 @@ public class BdbGroupResCtrlMapperImpl implements GroupResCtrlMapper {
@Override
public Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupNameSet,
- GroupResCtrlEntity qryEntity) {
- GroupResCtrlEntity entity;
+ GroupResCtrlEntity qryEntry) {
Map<String, GroupResCtrlEntity> retMap = new HashMap<>();
if (groupNameSet == null || groupNameSet.isEmpty()) {
- for (GroupResCtrlEntity dataEntity : groupBaseCtrlCache.values()) {
- if (dataEntity != null && dataEntity.isMatched(qryEntity)) {
- retMap.put(dataEntity.getGroupName(), dataEntity);
+ for (GroupResCtrlEntity entry : groupBaseCtrlCache.values()) {
+ if (entry == null || (qryEntry != null && !entry.isMatched(qryEntry))) {
+ continue;
}
+ retMap.put(entry.getGroupName(), entry);
}
} else {
+ GroupResCtrlEntity entry;
for (String groupName : groupNameSet) {
- entity = groupBaseCtrlCache.get(groupName);
- if (entity != null && entity.isMatched(qryEntity)) {
- retMap.put(entity.getGroupName(), entity);
+ entry = groupBaseCtrlCache.get(groupName);
+ if (entry == null || (qryEntry != null && !entry.isMatched(qryEntry))) {
+ continue;
}
+ retMap.put(entry.getGroupName(), entry);
}
}
return retMap;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
index ca94af5..0e5ed43 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicCtrlMapperImpl.java
@@ -174,7 +174,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
retEntitys.addAll(topicCtrlCache.values());
} else {
for (TopicCtrlEntity entity : topicCtrlCache.values()) {
- if (entity.isMatched(qryEntity)) {
+ if (entity != null && entity.isMatched(qryEntity)) {
retEntitys.add(entity);
}
}
@@ -194,9 +194,7 @@ public class BdbTopicCtrlMapperImpl implements TopicCtrlMapper {
}
for (String topicName : qryKeySet) {
TopicCtrlEntity entity = topicCtrlCache.get(topicName);
- if (entity == null
- || (qryEntity != null
- && !entity.isMatched(qryEntity))) {
+ if (entity == null || (qryEntity != null && !entity.isMatched(qryEntity))) {
continue;
}
retEntityMap.put(topicName, entity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
index 22002ce..c27be1c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.java
@@ -207,7 +207,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
retEntitys.addAll(topicConfCache.values());
} else {
for (TopicDeployEntity entity : topicConfCache.values()) {
- if (entity.isMatched(qryEntity)) {
+ if (entity != null && entity.isMatched(qryEntity)) {
retEntitys.add(entity);
}
}
@@ -233,48 +233,51 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
Set<Integer> brokerIdSet,
TopicDeployEntity qryEntity) {
List<TopicDeployEntity> items;
- Set<String> qryTopicKey = null;
+ Set<String> qryTopicKeySet = null;
ConcurrentHashSet<String> keySet;
Map<String, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
// get deploy records set by topicName
if (topicNameSet != null && !topicNameSet.isEmpty()) {
- qryTopicKey = new HashSet<>();
+ qryTopicKeySet = new HashSet<>();
for (String topicName : topicNameSet) {
keySet = topicNameCacheIndex.get(topicName);
if (keySet != null && !keySet.isEmpty()) {
- qryTopicKey.addAll(keySet);
+ qryTopicKeySet.addAll(keySet);
}
}
}
// get deploy records set by brokerId
if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
- if (qryTopicKey == null) {
- qryTopicKey = new HashSet<>();
+ if (qryTopicKeySet == null) {
+ qryTopicKeySet = new HashSet<>();
}
for (Integer brokerId : brokerIdSet) {
keySet = brokerIdCacheIndex.get(brokerId);
if (keySet != null && !keySet.isEmpty()) {
- qryTopicKey.addAll(keySet);
+ qryTopicKeySet.addAll(keySet);
}
}
}
// filter record by qryEntity
- if (qryTopicKey == null) {
- for (TopicDeployEntity deployEntity : topicConfCache.values()) {
- if (deployEntity != null && deployEntity.isMatched(qryEntity)) {
- items = retEntityMap.computeIfAbsent(
- deployEntity.getTopicName(), k -> new ArrayList<>());
- items.add(deployEntity);
+ if (qryTopicKeySet == null) {
+ for (TopicDeployEntity entry : topicConfCache.values()) {
+ if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
+ continue;
}
+ items = retEntityMap.computeIfAbsent(
+ entry.getTopicName(), k -> new ArrayList<>());
+ items.add(entry);
}
} else {
- for (String recKey : qryTopicKey) {
- TopicDeployEntity deployEntity = topicConfCache.get(recKey);
- if (deployEntity != null && deployEntity.isMatched(qryEntity)) {
- items = retEntityMap.computeIfAbsent(
- deployEntity.getTopicName(), k -> new ArrayList<>());
- items.add(deployEntity);
+ TopicDeployEntity entry;
+ for (String recKey : qryTopicKeySet) {
+ entry = topicConfCache.get(recKey);
+ if (entry == null || (qryEntity != null && !entry.isMatched(qryEntity))) {
+ continue;
}
+ items = retEntityMap.computeIfAbsent(
+ entry.getTopicName(), k -> new ArrayList<>());
+ items.add(entry);
}
}
return retEntityMap;
@@ -282,11 +285,16 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(
- Set<String> topicNameSet, Set<Integer> brokerIdSet) {
+ Set<Integer> brokerIdSet, Set<String> topicNameSet) {
List<TopicDeployEntity> items;
Set<String> qryTopicKey = null;
ConcurrentHashSet<String> keySet;
Map<Integer, List<TopicDeployEntity>> retEntityMap = new HashMap<>();
+ if (brokerIdSet != null) {
+ for (Integer brokerId : brokerIdSet) {
+ retEntityMap.put(brokerId, new ArrayList<>());
+ }
+ }
if (topicNameSet != null && !topicNameSet.isEmpty()) {
qryTopicKey = new HashSet<>();
for (String topicName : topicNameSet) {
@@ -318,11 +326,8 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
if (entity == null) {
continue;
}
- items = retEntityMap.get(entity.getBrokerId());
- if (items == null) {
- items = new ArrayList<>();
- retEntityMap.put(entity.getBrokerId(), items);
- }
+ items = retEntityMap.computeIfAbsent(
+ entity.getBrokerId(), k -> new ArrayList<>());
items.add(entity);
}
return retEntityMap;
@@ -339,11 +344,8 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
if (((topicSet == null) || (topicSet.isEmpty()))
&& ((brokerIdSet == null) || (brokerIdSet.isEmpty()))) {
for (TopicDeployEntity entity : topicConfCache.values()) {
- itemLst = retEntityMap.get(entity.getTopicName());
- if (itemLst == null) {
- itemLst = new ArrayList<>();
- retEntityMap.put(entity.getTopicName(), itemLst);
- }
+ itemLst = retEntityMap.computeIfAbsent(
+ entity.getTopicName(), k -> new ArrayList<>());
itemLst.add(entity);
}
return retEntityMap;
@@ -370,11 +372,8 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
if (tmpEntity == null) {
continue;
}
- itemLst = retEntityMap.get(tmpEntity.getTopicName());
- if (itemLst == null) {
- itemLst = new ArrayList<>();
- retEntityMap.put(tmpEntity.getTopicName(), itemLst);
- }
+ itemLst = retEntityMap.computeIfAbsent(
+ tmpEntity.getTopicName(), k -> new ArrayList<>());
itemLst.add(tmpEntity);
}
return retEntityMap;
@@ -449,9 +448,7 @@ public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
@Override
public Set<String> getConfiguredTopicSet() {
- Set<String> topicNames = new HashSet<>();
- topicNames.addAll(topicNameCacheIndex.keySet());
- return topicNames;
+ return new HashSet<>(topicNameCacheIndex.keySet());
}
@Override
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index 50a3dc4..dcdbdb7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -45,14 +45,17 @@ public class BrokerPSInfoHolder {
* remove broker all configure info
*
* @param brokerId broker id index
+ * @param isTimeout if broker is timeout
*/
- public void rmvBrokerAllPushedInfo(int brokerId) {
+ public void rmvBrokerAllPushedInfo(int brokerId, boolean isTimeout) {
// remove broker status Info
enablePubBrokerIdSet.remove(brokerId);
enableSubBrokerIdSet.remove(brokerId);
- // remove broker topic info
- subTopicInfoView.rmvBrokerTopicInfo(brokerId);
- pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
+ if (!isTimeout) {
+ // remove broker topic info
+ subTopicInfoView.rmvBrokerTopicInfo(brokerId);
+ pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
+ }
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index 28f8832..973435b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -59,7 +59,7 @@ public interface BrokerRunManager {
boolean brokerClose2M(int brokerId, StringBuilder sBuffer, ProcessResult result);
- boolean releaseBrokerRunInfo(int brokerId, String blockId);
+ boolean releaseBrokerRunInfo(int brokerId, String blockId, boolean isTimeout);
BrokerRunStatusInfo getBrokerRunStatusInfo(int brokerId);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index 44f5c36..dbd068e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -86,7 +86,8 @@ public class DefBrokerRunManager implements BrokerRunManager {
public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) throws Exception {
logger.info(new StringBuilder(512).append("[Broker Timeout] ")
.append(nodeId).toString());
- releaseBrokerRunInfo(Integer.parseInt(nodeId), nodeInfo.getSecondKey());
+ releaseBrokerRunInfo(Integer.parseInt(nodeId),
+ nodeInfo.getSecondKey(), true);
}
});
}
@@ -292,7 +293,7 @@ public class DefBrokerRunManager implements BrokerRunManager {
return result.isSuccess();
}
boolean isOverTls = runStatusInfo.isOverTLS();
- releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
+ releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId(), false);
logger.info(sBuffer.append("[Broker Closed]").append(brokerId)
.append(" unregister success, isOverTLS=").append(isOverTls).toString());
result.setSuccResult(null);
@@ -429,18 +430,20 @@ public class DefBrokerRunManager implements BrokerRunManager {
}
@Override
- public boolean releaseBrokerRunInfo(int brokerId, String blockId) {
+ public boolean releaseBrokerRunInfo(int brokerId, String blockId, boolean isTimeout) {
StringBuilder sBuffer = new StringBuilder(512);
BrokerRunStatusInfo runStatusInfo =
brokerRunSyncManageMap.get(brokerId);
if (runStatusInfo == null) {
logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
- .append(" release failure, run-info has deleted before!").toString());
+ .append(", isTimeout=").append(isTimeout)
+ .append(", release failure, run-info has deleted before!").toString());
return false;
}
if (!blockId.equals(runStatusInfo.getCreateId())) {
logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
- .append(" release failure, run-info has been replaced by new register!")
+ .append(", isTimeout=").append(isTimeout)
+ .append(", release failure, run-info has been replaced by new register!")
.toString());
return false;
}
@@ -450,9 +453,10 @@ public class DefBrokerRunManager implements BrokerRunManager {
}
brokerTotalCount.decrementAndGet();
brokerAbnHolder.removeBroker(brokerId);
- brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
- logger.info(sBuffer.append("[Broker Release] brokerId=")
- .append(brokerId).append(" release success!").toString());
+ brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId, isTimeout);
+ logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
+ .append(", isTimeout=").append(isTimeout)
+ .append(", release success!").toString());
return true;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index a371f67..1780bba 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -81,7 +81,6 @@ public class Webapi implements Action {
if (!metaDataManager.isSelfMaster()) {
throw new StandbyException("Please send your request to the master Node.");
}
- String type = req.getParameter("type");
String method = req.getParameter("method");
String strCallbackFun = req.getParameter("callback");
if ((TStringUtils.isNotEmpty(strCallbackFun))
@@ -89,15 +88,12 @@ public class Webapi implements Action {
&& (strCallbackFun.matches(TBaseConstants.META_TMP_CALLBACK_STRING_VALUE))) {
strCallbackFun = strCallbackFun.trim();
}
- if (type == null) {
- throw new Exception("Please take with type parameter!");
- }
if (method == null) {
throw new Exception("Please take with method parameter!");
}
WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(method);
if (webApiRegInfo == null) {
- throw new Exception("Unsupported method!");
+ throw new Exception("unsupported method!");
}
// check master is current node
if (webApiRegInfo.onlyMasterOp
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 4c93c0e..ed1af84 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -993,12 +993,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
List<Map<String, String>> groupJsonArray =
(List<Map<String, String>>) result.getRetData();
GroupConsumeCtrlEntity itemEntity;
- Map<String, String> itemValueMap;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
metaDataManager.getTotalConfiguredTopicNames();
- for (int j = 0; j < groupJsonArray.size(); j++) {
- itemValueMap = groupJsonArray.get(j);
+ for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
isAddOp, defOpEntity, sBuffer, result)) {
@@ -1125,12 +1123,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
List<Map<String, String>> groupJsonArray =
(List<Map<String, String>>) result.getRetData();
GroupConsumeCtrlEntity itemEntity;
- Map<String, String> itemValueMap;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
metaDataManager.getTotalConfiguredTopicNames();
- for (int j = 0; j < groupJsonArray.size(); j++) {
- itemValueMap = groupJsonArray.get(j);
+ for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
true, defOpEntity, sBuffer, result)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index 65cc221..e0b29b6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -249,8 +249,7 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
TopicCtrlEntity itemConf;
Map<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
// check and get topic deployment configure
- for (int j = 0; j < deployJsonArray.size(); j++) {
- Map<String, String> confMap = deployJsonArray.get(j);
+ for (Map<String, String> confMap : deployJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(confMap,
true, defOpEntity, sBuffer, result)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index 1971caa..0f70fd4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -186,7 +186,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
TopicStatus topicStatus = (TopicStatus) result.getRetData();
// get and valid broker manage status info
- if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ if (!getManageStatusParamValue(req, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
@@ -219,6 +219,11 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, qryEntity);
// build query result
int totalCnt = 0;
+ boolean isConfUpdated;
+ boolean isConfLoaded;
+ Tuple2<Boolean, Boolean> syncTuple;
+ BrokerRunStatusInfo runStatusInfo;
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (BrokerConfEntity entity : qryResult.values()) {
Map<String, TopicDeployEntity> topicConfEntityMap =
@@ -229,8 +234,16 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
if (totalCnt++ > 0) {
sBuffer.append(",");
}
- entity.toWebJsonStr(sBuffer, true, false);
- sBuffer = addTopicInfo(withTopic, sBuffer, topicConfEntityMap);
+ isConfUpdated = false;
+ isConfLoaded = false;
+ runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(entity.getBrokerId());
+ if (runStatusInfo != null) {
+ syncTuple = runStatusInfo.getDataSyncStatus();
+ isConfUpdated = syncTuple.getF0();
+ isConfLoaded = syncTuple.getF1();
+ }
+ entity.toWebJsonStr(sBuffer, isConfUpdated, isConfLoaded, true, false);
+ addTopicInfo(withTopic, sBuffer, topicConfEntityMap);
sBuffer.append("}");
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
@@ -400,15 +413,17 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return sBuffer;
}
Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
- // get and valid broker manage status info
- if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ // get and valid broker read or write status info
+ if (!getAcceptReadAndWriteParamValue(req, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- ManageStatus mngStatus = (ManageStatus) result.getRetData();
+ Tuple2<Boolean, Boolean> rdWtTpl =
+ (Tuple2<Boolean, Boolean>) result.getRetData();
+ // change broker status
List<BrokerProcessResult> retInfo =
- metaDataManager.changeBrokerConfStatus(opEntity,
- brokerIds, mngStatus, sBuffer, result);
+ metaDataManager.changeBrokerRWStatus(opEntity,
+ brokerIds, rdWtTpl, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}
@@ -713,7 +728,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
ManageStatus.STATUS_MANAGE_APPLY, brokerProps, sBuffer, result));
} else {
// get and valid broker manage status info
- if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ if (!getManageStatusParamValue(req, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
@@ -779,15 +794,12 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
private boolean isValidRecord(Set<String> qryTopicSet, Boolean isInclude,
TopicStatus topicStatus,
Map<String, TopicDeployEntity> topicConfEntityMap) {
- if ((topicConfEntityMap == null) || (topicConfEntityMap.isEmpty())) {
- if ((qryTopicSet.isEmpty() || !isInclude)
- && topicStatus == TopicStatus.STATUS_TOPIC_UNDEFINED) {
- return true;
- }
- return false;
+ if (topicConfEntityMap == null || topicConfEntityMap.isEmpty()) {
+ return ((qryTopicSet == null || qryTopicSet.isEmpty())
+ && topicStatus == TopicStatus.STATUS_TOPIC_UNDEFINED);
}
// first search topic if match require
- if (!qryTopicSet.isEmpty()) {
+ if (qryTopicSet != null && !qryTopicSet.isEmpty()) {
boolean matched = false;
Set<String> curTopics = topicConfEntityMap.keySet();
if (isInclude) {
@@ -811,12 +823,15 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
}
// second check topic status if match
- for (TopicDeployEntity topicConfEntity : topicConfEntityMap.values()) {
- if (topicConfEntity.getDeployStatus() == topicStatus) {
- return true;
+ if (topicStatus != TopicStatus.STATUS_TOPIC_UNDEFINED) {
+ for (TopicDeployEntity topicConfEntity : topicConfEntityMap.values()) {
+ if (topicConfEntity.getDeployStatus() == topicStatus) {
+ return true;
+ }
}
+ return false;
}
- return false;
+ return true;
}
/**
@@ -860,8 +875,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// check and get broker configure
BrokerConfEntity itemEntity;
HashMap<Integer, BrokerConfEntity> addedRecordMap = new HashMap<>();
- for (int j = 0; j < brokerJsonArray.size(); j++) {
- Map<String, String> brokerObject = brokerJsonArray.get(j);
+ for (Map<String, String> brokerObject : brokerJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(brokerObject,
isAddOp, defOpEntity, sBuffer, result)) {
@@ -927,7 +941,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
addedRecordMap.put(itemEntity.getBrokerId(), itemEntity);
} else {
// get and valid broker manage status info
- if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ if (!getManageStatusParamValue(req, sBuffer, result)) {
return result.isSuccess();
}
ManageStatus mngStatus = (ManageStatus) result.getRetData();
@@ -965,8 +979,8 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
sBuffer.append(",");
}
sBuffer.append("{\"brokerId\":").append(entry.getBrokerId())
- .append("{\"brokerIp\":\"").append(entry.getBrokerIp()).append("\"")
- .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"brokerIp\":\"").append(entry.getBrokerIp())
+ .append("\",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
@@ -1025,15 +1039,14 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return result.isSuccess();
}
- private <T> boolean getManageStatusParamValue(boolean isAddOp, T paramCntr,
+ private <T> boolean getManageStatusParamValue(T paramCntr,
StringBuilder sBuffer,
ProcessResult result) {
// get manage status id value
if (!WebParameterUtils.getIntParamValue(paramCntr,
WebFieldDef.MANAGESTATUS, false,
- (isAddOp ? ManageStatus.STATUS_MANAGE_APPLY.getCode()
- : ManageStatus.STATUS_MANAGE_UNDEFINED.getCode()),
- ManageStatus.STATUS_MANAGE_APPLY.getCode(),
+ ManageStatus.STATUS_MANAGE_UNDEFINED.getCode(),
+ ManageStatus.STATUS_MANAGE_ONLINE.getCode(),
ManageStatus.STATUS_MANAGE_OFFLINE.getCode(), sBuffer, result)) {
return result.isSuccess();
}
@@ -1049,45 +1062,33 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
- if (mngStatus == ManageStatus.STATUS_MANAGE_UNDEFINED) {
- // compatible with old version api
- if (!WebParameterUtils.getBooleanParamValue(paramCntr,
- WebFieldDef.ACCEPTPUBLISH, false, null, sBuffer, result)) {
- return result.isSuccess();
- }
- Boolean publishParam = (Boolean) result.getRetData();
- if (!WebParameterUtils.getBooleanParamValue(paramCntr,
- WebFieldDef.ACCEPTSUBSCRIBE, false, null, sBuffer, result)) {
- return result.isSuccess();
- }
- Boolean subscribeParam = (Boolean) result.getRetData();
- if (publishParam == null && subscribeParam == null) {
- mngStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
- } else if (publishParam != null && subscribeParam != null) {
- if (publishParam) {
- if (subscribeParam) {
- mngStatus = ManageStatus.STATUS_MANAGE_ONLINE;
- } else {
- mngStatus = ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ;
- }
- } else {
- if (subscribeParam) {
- mngStatus = ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE;
- } else {
- mngStatus = ManageStatus.STATUS_MANAGE_OFFLINE;
- }
- }
- } else {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
- sBuffer.append("Fields ").append(WebFieldDef.ACCEPTPUBLISH.name)
- .append(" and ").append(WebFieldDef.ACCEPTSUBSCRIBE.name)
- .append(" must exist at the same time!").toString());
- sBuffer.delete(0, sBuffer.length());
- return result.isSuccess();
- }
- }
result.setSuccResult(mngStatus);
return result.isSuccess();
}
+ private <T> boolean getAcceptReadAndWriteParamValue(T paramCntr,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getBooleanParamValue(paramCntr,
+ WebFieldDef.ACCEPTPUBLISH, false, null, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean publishParam = (Boolean) result.getRetData();
+ if (!WebParameterUtils.getBooleanParamValue(paramCntr,
+ WebFieldDef.ACCEPTSUBSCRIBE, false, null, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean subscribeParam = (Boolean) result.getRetData();
+ if (publishParam == null && subscribeParam == null) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuffer.append("Fields ").append(WebFieldDef.ACCEPTPUBLISH.name)
+ .append(" or ").append(WebFieldDef.ACCEPTSUBSCRIBE.name)
+ .append(" must exist at this method!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(new Tuple2<>(publishParam, subscribeParam));
+ return result.isSuccess();
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 05c2f13..802de9c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -345,12 +345,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
(List<Map<String, String>>) result.getRetData();
// parse groupCsmJsonSet field info
GroupConsumeCtrlEntity itemConf;
- Map<String, String> itemsMap;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
metaDataManager.getTotalConfiguredTopicNames();
- for (int j = 0; j < filterJsonArray.size(); j++) {
- itemsMap = filterJsonArray.get(j);
+ for (Map<String, String> itemsMap : filterJsonArray) {
if (!WebParameterUtils.getStringParamValue(itemsMap,
WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
return result.success;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 00c6dcb..80c1af1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -313,10 +313,8 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
int defQryPriorityId = defClusterSetting.getQryPriorityId();
// check and get topic control configure
GroupResCtrlEntity itemEntity;
- Map<String, String> itemValueMap;
Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
- for (int j = 0; j < ctrlJsonArray.size(); j++) {
- itemValueMap = ctrlJsonArray.get(j);
+ for (Map<String, String> itemValueMap : ctrlJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
isAddOp, defOpEntity, sBuffer, result)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index c045c4d..46761ec 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -394,11 +394,16 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
}
int inBrokerWebPort = (int) result.getRetData();
// get and valid TopicPropGroup info
- if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
+ TopicPropGroup defProps = null;
+ if (isAddOp) {
+ defProps = new TopicPropGroup();
+ defProps.fillDefaultValue();
+ }
+ if (!WebParameterUtils.getTopicPropInfo(req, defProps, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- TopicPropGroup defTopicProps = (TopicPropGroup) result.getRetData();
+ TopicPropGroup inTopicProps = (TopicPropGroup) result.getRetData();
// get and valid qryPriorityId info
if (!WebParameterUtils.getQryPriorityIdParameter(req,
false, TBaseConstants.META_VALUE_UNDEFINED,
@@ -433,7 +438,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
// add or modify record
if (!metaDataManager.addOrUpdClusterDefSetting(opEntity, inBrokerPort,
inBrokerTlsPort, inBrokerWebPort, maxMsgSizeMB, inQryPriorityId,
- flowCtrlEnable, flowRuleCnt, flowCtrlInfo, defTopicProps, sBuffer, result)) {
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, inTopicProps, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index 83052e3..2ecb238 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -286,10 +286,8 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
int defMaxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
// check and get topic control configure
TopicCtrlEntity itemConf;
- Map<String, String> itemConfMap;
Map<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
- for (int j = 0; j < ctrlJsonArray.size(); j++) {
- itemConfMap = ctrlJsonArray.get(j);
+ for (Map<String, String> itemConfMap : ctrlJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemConfMap,
isAddOp, defOpEntity, sBuffer, result)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 5dc45e9..cc8145d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -244,7 +244,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
Map<Integer, List<TopicDeployEntity>> queryResult =
- metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
+ metaDataManager.getTopicDeployInfoMap(brokerIdSet, topicNameSet);
// build query result
int dataCount = 0;
int totalStoreNum = 0;
@@ -524,7 +524,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
entity.toWebJsonStr(sBuffer, true, false);
- sBuffer.append("\",\"runInfo\":{");
+ sBuffer.append(",\"runInfo\":{");
BrokerConfEntity brokerConfEntity =
metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
String strManageStatus = "-";
@@ -569,7 +569,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
.append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
.append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
.append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount)
- .append(",\"authData\":[");
+ .append(",\"authData\":{");
if (enableAuthCtrl) {
sBuffer.append("\"enableAuthControl\":").append(enableAuthCtrl)
.append(",\"createUser\":\"").append(ctrlEntity.getModifyUser())
@@ -654,7 +654,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
entity.toWebJsonStr(sBuffer, true, false);
- sBuffer.append("\",\"runInfo\":{");
+ sBuffer.append(",\"runInfo\":{");
BrokerConfEntity brokerConfEntity =
metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
String strManageStatus = "-";
@@ -776,7 +776,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
BaseEntity defOpEntity = (BaseEntity) result.getRetData();
// check and get add record map
- if (!getTopicDeployJsonSetInfo(req, true, defOpEntity, sBuffer, result)) {
+ if (!getTopicDeployJsonSetInfo(req, isAddOp, defOpEntity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
@@ -802,8 +802,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
TopicDeployEntity itemConf;
Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
// check and get topic deployment configure
- for (int j = 0; j < deployJsonArray.size(); j++) {
- Map<String, String> confMap = deployJsonArray.get(j);
+ for (Map<String, String> confMap : deployJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(confMap,
isAddOp, defOpEntity, sBuffer, result)) {
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
new file mode 100644
index 0000000..f27772a
--- /dev/null
+++ b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+
+public class WebParameterUtilsTest {
+
+ @Test
+ public void getStringParamValueTest() {
+ boolean retValue;
+ String paraDataStr;
+ String initialValue;
+ Set<String> paraDataSet;
+ StringBuilder sBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ Map<String, String> paramCntrMap = new HashMap<>();
+ // case 1
+ initialValue = null;
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.COMPSGROUPNAME, false, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataSet = (Set<String>) result.getRetData();
+ Assert.assertEquals(paraDataSet, Collections.EMPTY_SET);
+ // case 2
+ paramCntrMap.clear();
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.COMPSGROUPNAME, true, initialValue, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 3
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.COMPSGROUPNAME.shortName, "");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.COMPSGROUPNAME, true, initialValue, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 4
+ paramCntrMap.clear();
+ Set<String> exceptedValSet = new TreeSet<>();
+ exceptedValSet.add("group1");
+ exceptedValSet.add("group2");
+ exceptedValSet.add("group3");
+ paramCntrMap.put(WebFieldDef.COMPSGROUPNAME.name, "group3,group1,group2");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.COMPSGROUPNAME, true, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataSet = (Set<String>) result.getRetData();
+ Assert.assertEquals(paraDataSet, exceptedValSet);
+ // case 5
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.GROUPNAME.name, "test2,test1,test3");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.GROUPNAME, true, initialValue, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 6
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.GROUPNAME.name, "test2");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.GROUPNAME, true, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataStr = (String) result.getRetData();
+ Assert.assertEquals(paraDataStr, "test2");
+ // case 7
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.GROUPNAME.name, "");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.GROUPNAME, false, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataStr = (String) result.getRetData();
+ Assert.assertEquals(paraDataStr, initialValue);
+ // case 8
+ paramCntrMap.clear();
+ initialValue = "initial value";
+ paramCntrMap.put(WebFieldDef.GROUPNAME.name, "");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.GROUPNAME, false, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataStr = (String) result.getRetData();
+ Assert.assertEquals(paraDataStr, initialValue);
+ // case 9
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.COMPSGROUPNAME.name, "\"test1,test1,test3\"");
+ retValue = WebParameterUtils.getStringParamValue(paramCntrMap,
+ WebFieldDef.COMPSGROUPNAME, false, initialValue, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataSet = (Set<String>) result.getRetData();
+ Assert.assertEquals(paraDataSet.size(), 2);
+ }
+
+ @Test
+ public void getBooleanParamValueTest() {
+ boolean retValue;
+ Boolean paraDataObj;
+ Boolean initialValue;
+ StringBuilder sBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ Map<String, String> paramCntrMap = new HashMap<>();
+ // case 1
+ retValue = WebParameterUtils.getBooleanParamValue(paramCntrMap,
+ WebFieldDef.WITHIP, true, null, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 2
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.WITHIP.name, "1");
+ retValue = WebParameterUtils.getBooleanParamValue(paramCntrMap,
+ WebFieldDef.WITHIP, true, null, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataObj = (Boolean) result.getRetData();
+ Assert.assertEquals(paraDataObj, Boolean.TRUE);
+ // case 3
+ paramCntrMap.clear();
+ paramCntrMap.put(WebFieldDef.WITHIP.name, "false");
+ retValue = WebParameterUtils.getBooleanParamValue(paramCntrMap,
+ WebFieldDef.WITHIP, true, null, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ paraDataObj = (Boolean) result.getRetData();
+ Assert.assertEquals(paraDataObj, Boolean.FALSE);
+ }
+
+ @Test
+ public void getAUDBaseInfoTest() {
+ boolean retValue;
+ BaseEntity retEntry;
+ BaseEntity defOpEntity = new BaseEntity();
+ StringBuilder sBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ Map<String, String> paramCntrMap = new HashMap<>();
+ // case 1
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ true, null, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 2
+ paramCntrMap.clear();
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ false, null, sBuffer, result);
+ Assert.assertFalse(retValue);
+ Assert.assertFalse(result.isSuccess());
+ // case 3
+ paramCntrMap.clear();
+ defOpEntity.updQueryKeyInfo(-2,
+ "testCreate", "testModify");
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ true, defOpEntity, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (BaseEntity) result.getRetData();
+ Assert.assertEquals(defOpEntity.getCreateUser(), retEntry.getCreateUser());
+ Assert.assertEquals(defOpEntity.getModifyUser(), retEntry.getModifyUser());
+ // case 4
+ paramCntrMap.clear();
+ defOpEntity = null;
+ paramCntrMap.put(WebFieldDef.DATAVERSIONID.name, "1");
+ paramCntrMap.put(WebFieldDef.CREATEUSER.name, "test4");
+ paramCntrMap.put(WebFieldDef.CREATEDATE.name, "20210519082350");
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ true, defOpEntity, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (BaseEntity) result.getRetData();
+ Assert.assertEquals(String.valueOf(retEntry.getDataVerId()),
+ paramCntrMap.get(WebFieldDef.DATAVERSIONID.name));
+ Assert.assertEquals(retEntry.getCreateUser(),
+ paramCntrMap.get(WebFieldDef.CREATEUSER.name));
+ Assert.assertEquals(WebParameterUtils.date2yyyyMMddHHmmss(retEntry.getCreateDate()),
+ paramCntrMap.get(WebFieldDef.CREATEDATE.name));
+ Assert.assertEquals(retEntry.getModifyUser(),
+ paramCntrMap.get(WebFieldDef.CREATEUSER.name));
+ Assert.assertEquals(WebParameterUtils.date2yyyyMMddHHmmss(retEntry.getModifyDate()),
+ paramCntrMap.get(WebFieldDef.CREATEDATE.name));
+ // case 5
+ paramCntrMap.clear();
+ defOpEntity = new BaseEntity(1, "aa",
+ new Date(), "modify", new Date());
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ true, defOpEntity, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (BaseEntity) result.getRetData();
+ Assert.assertEquals(retEntry.getDataVerId(),
+ defOpEntity.getDataVerId());
+ Assert.assertEquals(retEntry.getCreateUser(),
+ defOpEntity.getCreateUser());
+ Assert.assertEquals(retEntry.getCreateDate(),
+ defOpEntity.getCreateDate());
+ Assert.assertEquals(retEntry.getModifyUser(),
+ defOpEntity.getModifyUser());
+ Assert.assertEquals(retEntry.getModifyDate(),
+ defOpEntity.getModifyDate());
+ // case 4
+ paramCntrMap.clear();
+ defOpEntity = null;
+ paramCntrMap.put(WebFieldDef.DATAVERSIONID.name, "1");
+ paramCntrMap.put(WebFieldDef.MODIFYUSER.name, "test4");
+ paramCntrMap.put(WebFieldDef.MODIFYDATE.name, "20210519082350");
+ retValue = WebParameterUtils.getAUDBaseInfo(paramCntrMap,
+ false, defOpEntity, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (BaseEntity) result.getRetData();
+ Assert.assertEquals(String.valueOf(retEntry.getDataVerId()),
+ paramCntrMap.get(WebFieldDef.DATAVERSIONID.name));
+ Assert.assertEquals(retEntry.getCreateUser(),
+ paramCntrMap.get(WebFieldDef.CREATEUSER.name));
+ Assert.assertEquals(retEntry.getModifyUser(),
+ paramCntrMap.get(WebFieldDef.MODIFYUSER.name));
+ Assert.assertEquals(WebParameterUtils.date2yyyyMMddHHmmss(retEntry.getModifyDate()),
+ paramCntrMap.get(WebFieldDef.MODIFYDATE.name));
+ }
+
+ @Test
+ public void getTopicPropInfoTest() {
+ boolean retValue;
+ TopicPropGroup retEntry;
+ TopicPropGroup defOpEntity = new TopicPropGroup();
+ StringBuilder sBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ Map<String, String> paramCntrMap = new HashMap<>();
+ // case 1
+ retValue = WebParameterUtils.getTopicPropInfo(paramCntrMap,
+ null, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (TopicPropGroup) result.getRetData();
+ }
+
+}