You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/04/30 03:00:27 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-601]Adjust
WebBrokerDefConfHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new b3e14b9 [INLONG-601]Adjust WebBrokerDefConfHandler class implementation
b3e14b9 is described below
commit b3e14b96c88d64f984418a4afcb40c5f021b1be4
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Apr 29 15:05:10 2021 +0800
[INLONG-601]Adjust WebBrokerDefConfHandler class implementation
---
.../server/broker/web/AbstractWebHandler.java | 12 +-
.../tubemq/server/common/fielddef/WebFieldDef.java | 15 +-
.../server/common/statusdef/ManageStatus.java | 7 +
.../server/common/utils/WebParameterUtils.java | 10 +-
.../server/master/metamanage/MetaDataManager.java | 119 +-
.../metastore/dao/entity/BrokerConfEntity.java | 8 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 24 +-
.../nodemanage/nodebroker/BrokerInfoHolder.java | 6 +-
.../server/master/web/action/screen/Webapi.java | 67 +-
.../web/handler/WebAdminFlowRuleHandler.java | 43 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 210 +--
.../web/handler/WebAdminTopicAuthHandler.java | 43 +-
.../master/web/handler/WebBrokerConfHandler.java | 497 +++++-
.../web/handler/WebBrokerDefConfHandler.java | 1666 --------------------
.../web/handler/WebGroupConsumeCtrlHandler.java | 62 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 62 +-
.../master/web/handler/WebMasterInfoHandler.java | 95 +-
.../master/web/handler/WebOtherInfoHandler.java | 12 +-
.../master/web/handler/WebTopicCtrlHandler.java | 68 +-
.../master/web/handler/WebTopicDeployHandler.java | 115 +-
20 files changed, 925 insertions(+), 2216 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
index 17a378d..9a24a02 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
@@ -52,29 +52,29 @@ public abstract class AbstractWebHandler extends HttpServlet {
@Override
protected void doPost(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
- StringBuilder strBuffer = new StringBuilder(1024);
+ StringBuilder sBuffer = new StringBuilder(1024);
try {
String method = req.getParameter("method");
if (method == null) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Please take with method parameter! \"}");
} else {
WebApiRegInfo webApiRegInfo = getWebApiRegInfo(method);
if (webApiRegInfo == null) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Unsupported method ").append(method).append("\"}");
} else {
- webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req, strBuffer);
+ webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req, sBuffer);
}
}
} catch (Throwable e) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Bad request from server: ")
.append(e.getMessage())
.append("\"}");
}
- resp.getWriter().write(strBuffer.toString());
+ resp.getWriter().write(sBuffer.toString());
resp.setCharacterEncoding(req.getCharacterEncoding());
resp.setStatus(HttpServletResponse.SC_OK);
resp.flushBuffer();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index c924b62..453352a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -241,9 +241,18 @@ public enum WebFieldDef {
"consumer id", TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH,
RegexDef.TMP_CONSUMERID),
ISENABLE(86, "isEnable", "isEnable",
- WebFieldType.BOOLEAN, "With status if enable.");
-
-
+ WebFieldType.BOOLEAN, "With status if enable."),
+ RELREASON(87, "relReason", "rRsn", WebFieldType.STRING,
+ "Release reason", TBaseConstants.META_MAX_OPREASON_LENGTH),
+ WITHDETAIL(88, "withDetail", "wDtl",
+ WebFieldType.BOOLEAN, "With broker configure detail info."),
+ ONLYABNORMAL(89, "onlyAbnormal", "oAbn",
+ WebFieldType.BOOLEAN, "only query abnormal broker info."),
+
+ ONLYAUTOFBD(90, "onlyAutoForbidden", "oAfb",
+ WebFieldType.BOOLEAN, "only auto forbidden abnormal broker info."),
+ ONLYENABLETLS(91, "onlyEnableTLS", "oEtls",
+ WebFieldType.BOOLEAN, "only enable tls broker info.");
public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
index e1f1962..09f78be 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/ManageStatus.java
@@ -66,6 +66,13 @@ public enum ManageStatus {
return new Tuple2<>(isAcceptPublish, isAcceptSubscribe);
}
+ public boolean isAcceptSubscribe() {
+ return isAcceptSubscribe;
+ }
+
+ public boolean isAcceptPublish() {
+ return isAcceptPublish;
+ }
public static ManageStatus valueOf(int code) {
for (ManageStatus status : ManageStatus.values()) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 9219a00..f11dde8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -1266,16 +1266,16 @@ public class WebParameterUtils {
/**
* Valid execution authorization info
* @param req Http Servlet Request
- * @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is must required
* @param master current master object
* @param result process result
* @return valid result for the parameter value
*/
- public static boolean validReqAuthorizeInfo(HttpServletRequest req, WebFieldDef fieldDef,
- boolean required, TMaster master,
- StringBuilder sBuffer, ProcessResult result) {
- if (!getStringParamValue(req, fieldDef, required, null, sBuffer, result)) {
+ public static boolean validReqAuthorizeInfo(HttpServletRequest req, boolean required,
+ TMaster master, StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!getStringParamValue(req, WebFieldDef.ADMINAUTHTOKEN,
+ required, null, sBuffer, result)) {
return result.isSuccess();
}
String paramValue = (String) result.getRetData();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index 80cfb32..7941250 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -480,7 +480,9 @@ public class MetaDataManager implements Server {
metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
if (curEntity == null) {
result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ sBuffer.append("Not found broker configure by brokerId=")
+ .append(entity.getBrokerId()).toString());
+ sBuffer.delete(0, sBuffer.length());
} else {
BrokerConfEntity newEntity = curEntity.clone();
newEntity.updBaseModifyInfo(entity);
@@ -489,9 +491,17 @@ public class MetaDataManager implements Server {
entity.getRegionId(), entity.getGroupId(),
entity.getManageStatus(), entity.getTopicProps())) {
metaStoreService.updBrokerConf(newEntity, sBuffer, result);
+ // update broker configure change status
+ BrokerSyncStatusInfo brokerSyncStatusInfo =
+ getBrokerRunSyncStatusInfo(entity.getBrokerId());
+ if (result.isSuccess()) {
+ if (brokerSyncStatusInfo != null) {
+ updateBrokerConfChanged(entity.getBrokerId(),
+ true, true, sBuffer, result);
+ }
+ }
} else {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- DataOpErrCode.DERR_UNCHANGED.getDescription());
+ result.setSuccResult(null);
}
}
}
@@ -590,6 +600,109 @@ public class MetaDataManager implements Server {
}
/**
+ * Change broker configure status
+ *
+ * @param opEntity operator
+ * @param brokerIdSet need deleted broker id set
+ * @param newMngStatus manage status
+ * @param sBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public List<BrokerProcessResult> changeBrokerConfStatus(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
+ ManageStatus newMngStatus,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ BrokerConfEntity curEntry;
+ BrokerConfEntity newEntry;
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ // check target broker configure's status
+ for (Integer brokerId : brokerIdSet) {
+ curEntry = metaStoreService.getBrokerConfByBrokerId(brokerId);
+ if (curEntry == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ "The broker configure not exist!");
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ if (curEntry.getManageStatus() == newMngStatus) {
+ result.setSuccResult(null);
+ retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
+ continue;
+ }
+ if (newMngStatus == ManageStatus.STATUS_MANAGE_OFFLINE) {
+ if (curEntry.getManageStatus().getCode()
+ < ManageStatus.STATUS_MANAGE_ONLINE.getCode()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("Broker by brokerId=").append(brokerId)
+ .append(" on draft status, not need offline operate!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ }
+ newEntry = curEntry.clone();
+ newEntry.updBaseModifyInfo(opEntity);
+ if (newEntry.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
+ metaStoreService.updBrokerConf(newEntry, sBuffer, result);
+ if (result.isSuccess()) {
+ triggerBrokerConfDataSync(newEntry,
+ curEntry.getManageStatus().getCode(), true, sBuffer, result);
+ }
+ } else {
+ result.setSuccResult(null);
+ }
+ retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
+ }
+ return retInfo;
+ }
+
+ /**
+ * Change broker configure status
+ *
+ * @param opEntity operator
+ * @param brokerIdSet need deleted broker id set
+ * @param sBuffer the print information string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public List<BrokerProcessResult> reloadBrokerConfInfo(BaseEntity opEntity,
+ Set<Integer> brokerIdSet,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ BrokerConfEntity curEntry;
+ List<BrokerProcessResult> retInfo = new ArrayList<>();
+ // check target broker configure's status
+ for (Integer brokerId : brokerIdSet) {
+ curEntry = metaStoreService.getBrokerConfByBrokerId(brokerId);
+ if (curEntry == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ "The broker configure not exist!");
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ if (!curEntry.getManageStatus().isOnlineStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ sBuffer.append("The broker manage status by brokerId=").append(brokerId)
+ .append(" not in online status, can't reload this configure! ")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
+ continue;
+ }
+ triggerBrokerConfDataSync(curEntry,
+ curEntry.getManageStatus().getCode(), true, sBuffer, result);
+ retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
+ }
+ return retInfo;
+ }
+
+ /**
* Delete broker configure information
*
* @param operator operator
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 243a638..cc62a8d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -146,7 +146,9 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
return manageStatus;
}
-
+ public String getManageStatusStr() {
+ return manageStatus.getDescription();
+ }
public void setManageStatus(ManageStatus manageStatus) {
this.manageStatus = manageStatus;
@@ -400,6 +402,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
.append(",\"brokerTLSPort\":").append(brokerTLSPort)
.append(",\"brokerWebPort\":").append(brokerWebPort)
.append(",\"manageStatus\":\"").append(manageSts).append("\"")
+ .append(",\"acceptPublish\":").append(manageStatus.isAcceptPublish())
+ .append(",\"acceptSubscribe\":").append(manageStatus.isAcceptSubscribe())
.append(",\"isConfChanged\":").append(isConfDataUpdated)
.append(",\"isConfLoaded\":").append(isBrokerLoaded)
.append(",\"regionId\":").append(regionId)
@@ -411,6 +415,8 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
.append(",\"bTlsPort\":").append(brokerTLSPort)
.append(",\"bWebPort\":").append(brokerWebPort)
.append(",\"mSts\":\"").append(manageSts).append("\"")
+ .append(",\"accPub\":").append(manageStatus.isAcceptPublish())
+ .append(",\"accSub\":").append(manageStatus.isAcceptSubscribe())
.append(",\"isConfChg\":").append(isConfDataUpdated)
.append(",\"isConfLd\":").append(isBrokerLoaded)
.append(",\"rId\":").append(regionId)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index e136df6..60b5ac5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -23,6 +23,7 @@ import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -223,19 +224,20 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
}
}
+ // get broker configures
if (qryBrokerKey == null) {
- qryBrokerKey = new HashSet<>(brokerConfCache.keySet());
- }
- if (qryBrokerKey.isEmpty()) {
- return retMap;
- }
- for (Integer brokerId : qryBrokerKey) {
- BrokerConfEntity entity = brokerConfCache.get(brokerId);
- if (entity == null
- || (qryEntity != null && !entity.isMatched(qryEntity))) {
- continue;
+ for (BrokerConfEntity entity : brokerConfCache.values()) {
+ if (entity != null && entity.isMatched(qryEntity)) {
+ retMap.put(entity.getBrokerId(), entity);
+ }
+ }
+ } else {
+ for (Integer brokerId : qryBrokerKey) {
+ BrokerConfEntity entity = brokerConfCache.get(brokerId);
+ if (entity != null && entity.isMatched(qryEntity)) {
+ retMap.put(entity.getBrokerId(), entity);
+ }
}
- retMap.put(entity.getBrokerId(), entity);
}
return retMap;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
index c9fdaea..042f1b6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
@@ -33,7 +33,6 @@ import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,12 +272,9 @@ public class BrokerInfoHolder {
oldEntity.isBrokerLoaded(), oldEntity.getRecordCreateUser(),
oldEntity.getRecordCreateDate(), "Broker AutoReport",
new Date());
- boolean isNeedFastStart =
- WebBrokerDefConfHandler.isBrokerStartNeedFast(brokerConfManager,
- newEntity.getBrokerId(), oldEntity.getManageStatus(), newEntity.getManageStatus());
brokerConfManager.confModBrokerDefaultConfig(newEntity);
brokerConfManager.triggerBrokerConfDataSync(newEntity,
- oldEntity.getManageStatus(), isNeedFastStart);
+ oldEntity.getManageStatus(), true);
return true;
} catch (Throwable e1) {
return false;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index 0422524..160db4a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -19,19 +19,20 @@ package org.apache.tubemq.server.master.web.action.screen;
import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
-import java.util.Arrays;
-import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corerpc.exception.StandbyException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.common.webbase.WebMethodMapper;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.web.handler.AbstractWebHandler;
+import org.apache.tubemq.server.master.web.handler.WebAdminFlowRuleHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminGroupCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminTopicAuthHandler;
-import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler;
+import org.apache.tubemq.server.master.web.handler.WebBrokerConfHandler;
import org.apache.tubemq.server.master.web.handler.WebGroupConsumeCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebGroupResCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebMasterInfoHandler;
@@ -49,15 +50,13 @@ import org.apache.tubemq.server.master.web.simplemvc.RequestContext;
* Generate output JSON by concatenating strings, to improve the performance.
*/
public class Webapi implements Action {
- // allowed type value set
- private static final List<String> allowedTypeValues =
- Arrays.asList("op_query", "op_modify");
+
private TMaster master;
public Webapi(TMaster master) {
this.master = master;
- registerHandler(new WebBrokerDefConfHandler(this.master));
+ registerHandler(new WebBrokerConfHandler(this.master));
registerHandler(new WebTopicCtrlHandler(this.master));
registerHandler(new WebTopicDeployHandler(this.master));
registerHandler(new WebGroupConsumeCtrlHandler(this.master));
@@ -66,11 +65,13 @@ public class Webapi implements Action {
registerHandler(new WebOtherInfoHandler(this.master));
registerHandler(new WebAdminGroupCtrlHandler(this.master));
registerHandler(new WebAdminTopicAuthHandler(this.master));
+ registerHandler(new WebAdminFlowRuleHandler(this.master));
}
@Override
public void execute(RequestContext requestContext) {
- StringBuilder strBuffer = new StringBuilder();
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder();
try {
HttpServletRequest req = requestContext.getReq();
if (this.master.isStopped()) {
@@ -94,39 +95,35 @@ public class Webapi implements Action {
if (method == null) {
throw new Exception("Please take with method parameter!");
}
- if (!allowedTypeValues.contains(type)) {
- throw new Exception("Unsupported operate type, only support " + allowedTypeValues);
- }
- boolean isQuery = true;
- if ("op_modify".equals(type)) {
- isQuery = false;
- // check master is current node
- if (brokerConfManager.isPrimaryNodeActive()) {
- throw new Exception(
- "DesignatedPrimary happened...please check if the other member is down");
- }
- }
WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(method);
if (webApiRegInfo == null) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Unsupported method: ")
- .append(method).append("\"}");
- requestContext.put("sb", strBuffer.toString());
+ throw new Exception("Unsupported method!");
+ }
+ // check master is current node
+ if (webApiRegInfo.onlyMasterOp
+ && brokerConfManager.isPrimaryNodeActive()) {
+ throw new Exception(
+ "DesignatedPrimary happened...please check if the other member is down");
+ }
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ webApiRegInfo.needAuthToken, master, sBuffer, result)) {
+ throw new Exception(result.errInfo);
+ }
+ sBuffer = (StringBuilder) webApiRegInfo.method.invoke(
+ webApiRegInfo.webHandler, req, sBuffer, result);
+ // Carry callback information
+ if (TStringUtils.isEmpty(strCallbackFun)) {
+ requestContext.put("sb", sBuffer.toString());
} else {
-
- strBuffer = (StringBuilder) webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req);
- // Carry callback information
- if (TStringUtils.isEmpty(strCallbackFun)) {
- requestContext.put("sb", strBuffer.toString());
- } else {
- requestContext.put("sb", strCallbackFun + "(" + strBuffer.toString() + ")");
- requestContext.getResp().addHeader("Content-type", "text/plain");
- requestContext.getResp().addHeader("charset", TBaseConstants.META_DEFAULT_CHARSET_NAME);
- }
+ requestContext.put("sb", strCallbackFun + "(" + sBuffer.toString() + ")");
+ requestContext.getResp().addHeader("Content-type", "text/plain");
+ requestContext.getResp().addHeader("charset", TBaseConstants.META_DEFAULT_CHARSET_NAME);
}
} catch (Throwable e) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Bad request from client, ")
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Bad request from client, ")
.append(e.getMessage()).append("\"}");
- requestContext.put("sb", strBuffer.toString());
+ requestContext.put("sb", sBuffer.toString());
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index d6a8b24..12a0a83 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -66,9 +66,9 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryGroupFlowCtrlRule(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryGroupFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupResCtrlEntity entity = new GroupResCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -124,8 +124,10 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminSetGroupFlowCtrlRule(HttpServletRequest req) {
- return innAddOrUpdGroupFlowCtrlRule(req, true);
+ public StringBuilder adminSetGroupFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupFlowCtrlRule(req, sBuffer, result, true);
}
/**
@@ -134,8 +136,10 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminUpdGroupFlowCtrlRule(HttpServletRequest req) {
- return innAddOrUpdGroupFlowCtrlRule(req, false);
+ public StringBuilder adminUpdGroupFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupFlowCtrlRule(req, sBuffer, result, false);
}
/**
@@ -144,15 +148,9 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDelGroupFlowCtrlRule(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDelGroupFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -182,15 +180,10 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
* @param req
* @return
*/
- private StringBuilder innAddOrUpdGroupFlowCtrlRule(HttpServletRequest req, boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ private StringBuilder innAddOrUpdGroupFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 52ecd3f..4c93c0e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -103,9 +103,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryBlackGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryBlackGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupConsumeCtrlEntity entity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -157,9 +157,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryConsumerGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryConsumerGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -216,9 +216,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryGroupFilterCondInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -298,9 +298,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryConsumeGroupSetting(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryConsumeGroupSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupResCtrlEntity entity = new GroupResCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -359,15 +359,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddBlackGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminAddBlackGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -405,15 +399,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddBlackGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminBatchAddBlackGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -441,15 +429,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteBlackGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteBlackGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -488,15 +470,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddConsumerGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminAddConsumerGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -534,15 +510,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddConsumerGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminBatchAddConsumerGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -570,15 +540,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteConsumerGroupInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteConsumerGroupInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -611,8 +575,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddGroupFilterCondInfo(HttpServletRequest req) {
- return innAddOrModGroupFilterCondInfo(req, true);
+ public StringBuilder adminAddGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrModGroupFilterCondInfo(req, sBuffer, result, true);
}
/**
@@ -621,8 +587,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModGroupFilterCondInfo(HttpServletRequest req) {
- return innAddOrModGroupFilterCondInfo(req, false);
+ public StringBuilder adminModGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrModGroupFilterCondInfo(req, sBuffer, result, false);
}
/**
@@ -631,8 +599,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddGroupFilterCondInfo(HttpServletRequest req) {
- return innBatchAddOrUpdGroupFilterCondInfo(req, true);
+ public StringBuilder adminBatchAddGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupFilterCondInfo(req, sBuffer, result, true);
}
/**
@@ -641,8 +611,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchModGroupFilterCondInfo(HttpServletRequest req) {
- return innBatchAddOrUpdGroupFilterCondInfo(req, false);
+ public StringBuilder adminBatchModGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupFilterCondInfo(req, sBuffer, result, false);
}
/**
@@ -651,15 +623,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteGroupFilterCondInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -697,15 +663,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminRebalanceGroupAllocateInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminRebalanceGroupAllocateInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -772,8 +732,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddConsumeGroupSettingInfo(HttpServletRequest req) {
- return innAddOrUpdConsumeGroupSettingInfo(req, true);
+ public StringBuilder adminAddConsumeGroupSettingInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdConsumeGroupSettingInfo(req, sBuffer, result, true);
}
/**
@@ -782,8 +744,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminUpdConsumeGroupSetting(HttpServletRequest req) {
- return innAddOrUpdConsumeGroupSettingInfo(req, false);
+ public StringBuilder adminUpdConsumeGroupSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdConsumeGroupSettingInfo(req, sBuffer, result, false);
}
/**
@@ -792,15 +756,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddConsumeGroupSetting(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminBatchAddConsumeGroupSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -829,15 +787,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteConsumeGroupSetting(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteConsumeGroupSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -886,15 +838,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innAddOrUpdConsumeGroupSettingInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -947,15 +893,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innAddOrModGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -1013,15 +953,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innBatchAddOrUpdGroupFilterCondInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index 79823ab..65cc221 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -62,9 +62,9 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryTopicAuthControl(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryTopicAuthControl(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
TopicCtrlEntity qryEntity = new TopicCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
@@ -144,15 +144,9 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminEnableDisableTopicAuthControl(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminEnableDisableTopicAuthControl(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -187,17 +181,10 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchAddTopicAuthControl(HttpServletRequest req) throws Exception {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminBatchAddTopicAuthControl(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -224,15 +211,9 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteTopicAuthControl(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteTopicAuthControl(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index 2e57e16..6761de0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -25,9 +25,11 @@ import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
@@ -40,7 +42,8 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerCon
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
/**
@@ -69,6 +72,9 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// register query method
registerQueryWebMethod("admin_query_broker_configure",
"adminQueryBrokerConfInfo");
+ registerQueryWebMethod("admin_query_broker_run_status",
+ "adminQueryBrokerRunStatusInfo");
+
// register modify method
registerModifyWebMethod("admin_add_broker_configure",
"adminAddBrokerConfInfo");
@@ -80,6 +86,22 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
"adminBatchUpdBrokerConfInfo");
registerModifyWebMethod("admin_delete_broker_configure",
"adminDeleteBrokerConfEntityInfo");
+ registerModifyWebMethod("admin_online_broker_configure",
+ "adminOnlineBrokerConf");
+ registerModifyWebMethod("admin_set_broker_read_or_write",
+ "adminSetReadOrWriteBrokerConf");
+ registerModifyWebMethod("admin_offline_broker_configure",
+ "adminOfflineBrokerConf");
+ registerModifyWebMethod("admin_reload_broker_configure",
+ "adminReloadBrokerConf");
+ registerModifyWebMethod("admin_release_broker_autoforbidden_status",
+ "adminRelBrokerAutoForbiddenStatus");
+
+ // Deprecated methods begin
+ // register modify method
+ registerModifyWebMethod("admin_bath_add_broker_configure",
+ "adminBatchAddBrokerConfInfo");
+ // Deprecated methods end
}
/**
@@ -88,9 +110,9 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryBrokerConfInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
BrokerConfEntity qryEntity = new BrokerConfEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
@@ -220,8 +242,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddBrokerConfInfo(HttpServletRequest req) {
- return innAddOrUpdBrokerConfInfo(req, true);
+ public StringBuilder adminAddBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdBrokerConfInfo(req, sBuffer, result, true);
}
/**
@@ -230,8 +254,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminUpdateBrokerConfInfo(HttpServletRequest req) {
- return innAddOrUpdBrokerConfInfo(req, false);
+ public StringBuilder adminUpdateBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdBrokerConfInfo(req, sBuffer, result, false);
}
/**
@@ -240,8 +266,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddBrokerConfInfo(HttpServletRequest req) {
- return innBatchAddOrUpdBrokerConfInfo(req, true);
+ public StringBuilder adminBatchAddBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdBrokerConfInfo(req, sBuffer, result, true);
}
/**
@@ -250,8 +278,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchUpdBrokerConfInfo(HttpServletRequest req) {
- return innBatchAddOrUpdBrokerConfInfo(req, false);
+ public StringBuilder adminBatchUpdBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdBrokerConfInfo(req, sBuffer, result, false);
}
/**
@@ -260,15 +290,9 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteBrokerConfEntityInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteBrokerConfEntityInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -295,16 +319,322 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return buildRetInfo(retInfo, sBuffer);
}
- private StringBuilder innAddOrUpdBrokerConfInfo(HttpServletRequest req,
- boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ /**
+ * Make broker config online
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminOnlineBrokerConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ List<BrokerProcessResult> retInfo =
+ metaDataManager.changeBrokerConfStatus(opEntity,
+ brokerIds, ManageStatus.STATUS_MANAGE_ONLINE, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ /**
+ * Make broker config offline
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminOfflineBrokerConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ List<BrokerProcessResult> retInfo =
+ metaDataManager.changeBrokerConfStatus(opEntity,
+ brokerIds, ManageStatus.STATUS_MANAGE_OFFLINE, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ /**
+ * Set read/write status of a broker.
+ * The same operations could be made by changing broker's config,
+ * but those are extracted here to simplify the code.
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminSetReadOrWriteBrokerConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ // get and valid broker manage status info
+ if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ ManageStatus mngStatus = (ManageStatus) result.getRetData();
+ List<BrokerProcessResult> retInfo =
+ metaDataManager.changeBrokerConfStatus(opEntity,
+ brokerIds, mngStatus, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ /**
+ * Reload broker config
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminReloadBrokerConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ List<BrokerProcessResult> retInfo =
+ metaDataManager.reloadBrokerConfInfo(opEntity,
+ brokerIds, sBuffer, result);
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ /**
+ * Release broker auto forbidden status
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminRelBrokerAutoForbiddenStatus(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ // check and get relReason field
+ if (!WebParameterUtils.getStringParamValue(req, WebFieldDef.RELREASON,
+ false, "Web API call", sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ String relReason = (String) result.getRetData();
+ BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
+ brokerInfoHolder.relAutoForbiddenBrokerInfo(brokerIds, relReason);
+ WebParameterUtils.buildSuccessResult(sBuffer);
+ return sBuffer;
+ }
+
+ /**
+ * Query run status of broker
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminQueryBrokerRunStatusInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ BrokerConfEntity qryEntity = new BrokerConfEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
+ // get brokerIp info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPBROKERIP, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> brokerIpSet = (Set<String>) result.getRetData();
+ // get withDetail info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.WITHDETAIL, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ boolean withDetail = (Boolean) result.getRetData();
+ // get onlyAbnormal info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ONLYABNORMAL, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ boolean onlyAbnormal = (Boolean) result.getRetData();
+ // get onlyAutoForbidden info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ONLYAUTOFBD, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ boolean onlyAutoForbidden = (Boolean) result.getRetData();
+ // get onlyEnableTLS info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ONLYENABLETLS, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ boolean onlyEnableTLS = (Boolean) result.getRetData();
+ // query current broker configures
+ Map<Integer, BrokerConfEntity> brokerConfEntityMap =
+ metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, null);
+ BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
+ Map<Integer, BrokerInfoHolder.BrokerAbnInfo> brokerAbnInfoMap =
+ brokerInfoHolder.getBrokerAbnormalMap();
+ Map<Integer, BrokerInfoHolder.BrokerFbdInfo> brokerFbdInfoMap =
+ brokerInfoHolder.getAutoForbiddenBrokerMapInfo();
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (BrokerConfEntity entity : brokerConfEntityMap.values()) {
+ BrokerInfoHolder.BrokerAbnInfo brokerAbnInfo =
+ brokerAbnInfoMap.get(entity.getBrokerId());
+ if (onlyAbnormal && brokerAbnInfo == null) {
+ continue;
+ }
+ BrokerInfoHolder.BrokerFbdInfo brokerForbInfo =
+ brokerFbdInfoMap.get(entity.getBrokerId());
+ if (onlyAutoForbidden && brokerForbInfo == null) {
+ continue;
+ }
+ BrokerInfo brokerInfo = brokerInfoHolder.getBrokerInfo(entity.getBrokerId());
+ if (onlyEnableTLS && (brokerInfo == null || !brokerInfo.isEnableTLS())) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"brokerId\":").append(entity.getBrokerId())
+ .append(",\"brokerIp\":\"").append(entity.getBrokerIp())
+ .append("\",\"brokerPort\":").append(entity.getBrokerPort())
+ .append(",\"manageStatus\":\"").append(entity.getManageStatusStr()).append("\"");
+ if (brokerInfo == null) {
+ sBuffer.append(",\"brokerTLSPort\":").append(entity.getBrokerTLSPort())
+ .append(",\"enableTLS\":\"-\"");
+ } else {
+ sBuffer.append(",\"brokerTLSPort\":").append(entity.getBrokerTLSPort())
+ .append(",\"enableTLS\":").append(brokerInfo.isEnableTLS());
+ }
+ if (brokerAbnInfo == null) {
+ sBuffer.append(",\"isRepAbnormal\":false");
+ } else {
+ sBuffer.append(",\"isRepAbnormal\":true,\"repStatus\":")
+ .append(brokerAbnInfo.getAbnStatus());
+ }
+ if (brokerForbInfo == null) {
+ sBuffer.append(",\"isAutoForbidden\":false");
+ } else {
+ sBuffer.append(",\"isAutoForbidden\":true");
+ }
+ if (entity.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
+ sBuffer.append(",\"runStatus\":\"-\",\"subStatus\":\"-\"")
+ .append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
+ .append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
+ } else {
+ Tuple2<Boolean, Boolean> pubSubTuple =
+ entity.getManageStatus().getPubSubStatus();
+ BrokerSyncStatusInfo brokerSyncStatusInfo =
+ brokerConfManager.getBrokerRunSyncStatusInfo(entity.getBrokerId());
+ if (brokerSyncStatusInfo == null) {
+ sBuffer.append(",\"runStatus\":\"unRegister\",\"subStatus\":\"-\"")
+ .append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
+ .append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
+ } else {
+ int stepStatus = brokerSyncStatusInfo.getBrokerRunStatus();
+ if (brokerSyncStatusInfo.isBrokerOnline()) {
+ if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+ sBuffer.append(",\"runStatus\":\"running\",\"subStatus\":\"idle\"");
+ } else {
+ sBuffer.append(",\"runStatus\":\"running\"")
+ .append(",\"subStatus\":\"processing_event\",\"stepOp\":")
+ .append(stepStatus);
+ }
+ } else {
+ if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+ sBuffer.append(",\"runStatus\":\"notRegister\",\"subStatus\":\"idle\"");
+ } else {
+ sBuffer.append(",\"runStatus\":\"notRegister\"")
+ .append(",\"subStatus\":\"processing_event\",\"stepOp\":")
+ .append(stepStatus);
+ }
+ }
+ sBuffer.append(",\"isConfChanged\":\"").append(brokerSyncStatusInfo.isBrokerConfChanged())
+ .append("\",\"isConfLoaded\":\"").append(brokerSyncStatusInfo.isBrokerLoaded())
+ .append("\",\"isBrokerOnline\":\"").append(brokerSyncStatusInfo.isBrokerOnline())
+ .append("\"").append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"")
+ .append(pubSubTuple.getF0()).append("\",\"acceptSubscribe\":\"")
+ .append(pubSubTuple.getF1()).append("\"");
+ if (withDetail) {
+ sBuffer = brokerSyncStatusInfo.toJsonString(sBuffer.append(","), false);
+ }
+ }
+ }
+ sBuffer.append("}");
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+
+ private StringBuilder innAddOrUpdBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -363,12 +693,6 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return sBuffer;
}
TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
- // get and valid broker manage status info
- if (!getManageStatusParamValue(isAddOp, req, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- ManageStatus mngStatus = (ManageStatus) result.getRetData();
// add record and process result
List<BrokerProcessResult> retInfo = new ArrayList<>();
if (isAddOp) {
@@ -382,8 +706,14 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
retInfo.add(metaDataManager.addOrUpdBrokerConfig(isAddOp, opEntity,
brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1(), brokerPort,
brokerTlsPort, brokerWebPort, regionId, groupId,
- mngStatus, brokerProps, sBuffer, result));
+ ManageStatus.STATUS_MANAGE_APPLY, brokerProps, sBuffer, result));
} else {
+ // get and valid broker manage status info
+ if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ ManageStatus mngStatus = (ManageStatus) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
@@ -407,15 +737,9 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innBatchAddOrUpdBrokerConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -495,26 +819,26 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
* Private method to add topic info
*
* @param withTopic
- * @param sBuilder
+ * @param sBuffer
* @param topicConfEntityMap
* @return
*/
- private StringBuilder addTopicInfo(Boolean withTopic, StringBuilder sBuilder,
+ private StringBuilder addTopicInfo(Boolean withTopic, StringBuilder sBuffer,
Map<String, TopicDeployEntity> topicConfEntityMap) {
if (withTopic) {
- sBuilder.append(",\"topicSet\":[");
+ sBuffer.append(",\"topicSet\":[");
int topicCount = 0;
if (topicConfEntityMap != null) {
for (TopicDeployEntity topicEntity : topicConfEntityMap.values()) {
if (topicCount++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- topicEntity.toWebJsonStr(sBuilder, true, true);
+ topicEntity.toWebJsonStr(sBuffer, true, true);
}
}
- sBuilder.append("]");
+ sBuffer.append("]");
}
- return sBuilder;
+ return sBuffer;
}
private boolean getBrokerJsonSetInfo(HttpServletRequest req, boolean isAddOp,
@@ -583,11 +907,6 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return result.isSuccess();
}
TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
- // get and valid broker manage status info
- if (!getManageStatusParamValue(isAddOp, brokerObject, sBuffer, result)) {
- return result.isSuccess();
- }
- ManageStatus mngStatus = (ManageStatus) result.getRetData();
if (isAddOp) {
// get brokerIp and brokerId field
if (!getBrokerIpAndIdParamValue(brokerObject, sBuffer, result)) {
@@ -598,10 +917,16 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// buid new record
itemEntity = new BrokerConfEntity(itemOpEntity,
brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1());
- itemEntity.updModifyInfo(itemOpEntity.getDataVerId(), brokerPort, brokerTlsPort,
- brokerWebPort, regionId, groupId, mngStatus, brokerProps);
+ itemEntity.updModifyInfo(itemOpEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, regionId, groupId,
+ ManageStatus.STATUS_MANAGE_APPLY, brokerProps);
addedRecordMap.put(itemEntity.getBrokerId(), itemEntity);
} else {
+ // get and valid broker manage status info
+ if (!getManageStatusParamValue(false, req, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ ManageStatus mngStatus = (ManageStatus) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(brokerObject,
WebFieldDef.BROKERID, true, sBuffer, result)) {
@@ -628,21 +953,21 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
private StringBuilder buildRetInfo(List<BrokerProcessResult> retInfo,
- StringBuilder sBuilder) {
+ StringBuilder sBuffer) {
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (BrokerProcessResult entry : retInfo) {
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("{\"brokerId\":").append(entry.getBrokerId())
+ sBuffer.append("{\"brokerId\":").append(entry.getBrokerId())
.append("{\"brokerIp\":\"").append(entry.getBrokerIp()).append("\"")
.append(",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
private <T> boolean getBrokerIpAndIdParamValue(T paramCntr,
@@ -706,18 +1031,58 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
: ManageStatus.STATUS_MANAGE_UNDEFINED.getCode()),
ManageStatus.STATUS_MANAGE_APPLY.getCode(),
ManageStatus.STATUS_MANAGE_OFFLINE.getCode(), sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
+ ManageStatus mngStatus;
+ // parse manage status;
int manageStatusId = (int) result.getRetData();
try {
- ManageStatus mngStatus = ManageStatus.valueOf(manageStatusId);
- result.setSuccResult(mngStatus);
+ mngStatus = ManageStatus.valueOf(manageStatusId);
} catch (Throwable e) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
sBuffer.append("Illegal ").append(WebFieldDef.MANAGESTATUS.name)
.append(" parameter value :").append(e.getMessage()).toString());
sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (mngStatus == ManageStatus.STATUS_MANAGE_UNDEFINED) {
+ // compatible with old version api
+ if (!WebParameterUtils.getBooleanParamValue(paramCntr,
+ WebFieldDef.ACCEPTPUBLISH, false, null, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean publishParam = (Boolean) result.getRetData();
+ if (!WebParameterUtils.getBooleanParamValue(paramCntr,
+ WebFieldDef.ACCEPTSUBSCRIBE, false, null, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean subscribeParam = (Boolean) result.getRetData();
+ if (publishParam == null && subscribeParam == null) {
+ mngStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
+ } else if (publishParam != null && subscribeParam != null) {
+ if (publishParam) {
+ if (subscribeParam) {
+ mngStatus = ManageStatus.STATUS_MANAGE_ONLINE;
+ } else {
+ mngStatus = ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ;
+ }
+ } else {
+ if (subscribeParam) {
+ mngStatus = ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE;
+ } else {
+ mngStatus = ManageStatus.STATUS_MANAGE_OFFLINE;
+ }
+ }
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ sBuffer.append("Fields ").append(WebFieldDef.ACCEPTPUBLISH.name)
+ .append(" and ").append(WebFieldDef.ACCEPTSUBSCRIBE.name)
+ .append(" must exist at the same time!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
}
+ result.setSuccResult(mngStatus);
return result.isSuccess();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
deleted file mode 100644
index 826e4a3..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++ /dev/null
@@ -1,1666 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.web.handler;
-
-import static java.lang.Math.abs;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.utils.AddressUtils;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
-import org.apache.tubemq.server.common.utils.WebParameterUtils;
-import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.TStoreConstants;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * The class to handle the default config of broker, including:
- * - Add config
- * - Update config
- * - Delete config
- * And manage the broker status.
- * <p>
- * Please note that one IP could only host one broker, and brokerId must be unique
- */
-public class WebBrokerDefConfHandler extends AbstractWebHandler {
-
- private static final Logger logger =
- LoggerFactory.getLogger(WebBrokerDefConfHandler.class);
-
- /**
- * Constructor
- *
- * @param master tube master
- */
- public WebBrokerDefConfHandler(TMaster master) {
- super(master);
- }
-
- @Override
- public void registerWebApiMethod() {
- // register query method
- registerQueryWebMethod("admin_query_broker_run_status",
- "adminQueryBrokerRunStatusInfo");
- registerQueryWebMethod("admin_query_broker_configure",
- "adminQueryBrokerDefConfEntityInfo");
- // register modify method
- registerModifyWebMethod("admin_add_broker_configure",
- "adminAddBrokerDefConfEntityInfo");
- registerModifyWebMethod("admin_bath_add_broker_configure",
- "adminBatchAddBrokerDefConfEntityInfo");
- registerModifyWebMethod("admin_online_broker_configure",
- "adminOnlineBrokerConf");
- registerModifyWebMethod("admin_update_broker_configure",
- "adminUpdateBrokerConf");
- registerModifyWebMethod("admin_reload_broker_configure",
- "adminReloadBrokerConf");
- registerModifyWebMethod("admin_set_broker_read_or_write",
- "adminSetReadOrWriteBrokerConf");
- registerModifyWebMethod("admin_release_broker_autoforbidden_status",
- "adminRelBrokerAutoForbiddenStatus");
- registerModifyWebMethod("admin_offline_broker_configure",
- "adminOfflineBrokerConf");
- registerModifyWebMethod("admin_delete_broker_configure",
- "adminDeleteBrokerConfEntityInfo");
- }
-
- // #lizard forgives
- /**
- * Fast start a broker?
- *
- * @param webMaster
- * @param brokeId
- * @param oldManageStatus
- * @param newManageStatus
- * @return
- */
- public static boolean isBrokerStartNeedFast(BrokerConfManager webMaster,
- int brokeId,
- int oldManageStatus,
- int newManageStatus) {
- ConcurrentHashMap<String, BdbTopicConfEntity> bdbTopicConfEntMap =
- webMaster.getBrokerTopicConfEntitySet(brokeId);
- if ((bdbTopicConfEntMap == null)
- || (bdbTopicConfEntMap.isEmpty())) {
- return true;
- }
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- webMaster.getBrokerRunSyncStatusInfo(brokeId);
- if ((brokerSyncStatusInfo == null)
- || (!brokerSyncStatusInfo.isBrokerRegister())) {
- return true;
- }
- boolean isNeedFastStart =
- brokerSyncStatusInfo.isFastStart();
- if (isNeedFastStart) {
- switch (newManageStatus) {
- case TStatusConstants.STATUS_MANAGE_ONLINE: {
- if ((oldManageStatus == TStatusConstants.STATUS_MANAGE_APPLY)
- || (oldManageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)
- || (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ)) {
- isNeedFastStart = false;
- }
- if (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
- if ((brokerSyncStatusInfo.isBrokerConfChanged())
- || (!brokerSyncStatusInfo.isBrokerLoaded())) {
- isNeedFastStart = false;
- }
- }
- }
- break;
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
- if ((oldManageStatus == TStatusConstants.STATUS_MANAGE_APPLY)
- || (oldManageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)) {
- isNeedFastStart = false;
- }
- if (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- if ((brokerSyncStatusInfo.isBrokerConfChanged())
- || (!brokerSyncStatusInfo.isBrokerLoaded())) {
- isNeedFastStart = false;
- }
- }
- }
- break;
- case TStatusConstants.STATUS_MANAGE_OFFLINE: {
- if ((oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE)
- || (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE)) {
- isNeedFastStart = false;
- }
- }
- break;
- default: {
- //
- }
- }
- }
- return isNeedFastStart;
- }
-
- /**
- * Add default config to a broker
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminAddBrokerDefConfEntityInfo(
- HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String brokerIp =
- WebParameterUtils.checkParamCommonRequires("brokerIp",
- req.getParameter("brokerIp"), true);
- int brokerPort =
- WebParameterUtils.validIntDataParameter("brokerPort",
- req.getParameter("brokerPort"),
- false, 8123, 1);
- int brokerId =
- WebParameterUtils.validIntDataParameter("brokerId",
- req.getParameter("brokerId"),
- false, 0, 0);
- if (brokerId <= 0) {
- try {
- brokerId = abs(AddressUtils.ipToInt(brokerIp));
- } catch (Exception e) {
- throw new Exception(strBuffer
- .append("Get brokerId by brokerIp error !, exception is :")
- .append(e.toString()).toString());
- }
- }
- BdbBrokerConfEntity oldEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(brokerId);
- if (oldEntity != null) {
- throw new Exception(strBuffer
- .append("Duplicated broker default configure record by (brokerId or brokerIp), " +
- "query index is :")
- .append("brokerId=").append(brokerId)
- .append(",brokerIp=").append(brokerIp).toString());
- }
- ConcurrentHashMap<Integer, BdbBrokerConfEntity> bdbBrokerConfEntityMap =
- brokerConfManager.getBrokerConfStoreMap();
- for (BdbBrokerConfEntity brokerConfEntity : bdbBrokerConfEntityMap.values()) {
- if (brokerConfEntity.getBrokerIp().equals(brokerIp)
- && brokerConfEntity.getBrokerPort() == brokerPort) {
- strBuffer.append("Duplicated broker default configure record by (brokerIp and brokerPort), " +
- "query index is :")
- .append("brokerIp=").append(brokerIp)
- .append(",brokerPort=").append(brokerPort)
- .append(",existed record is ").append(brokerConfEntity);
- throw new Exception(strBuffer.toString());
- }
- }
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- String deleteWhen =
- WebParameterUtils.validDecodeStringParameter("deleteWhen",
- req.getParameter("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false,
- "0 0 6,18 * * ?");
- String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, "delete,168h");
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, createUser);
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, createDate);
- int numPartitions =
- WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"),
- false, 1, 1);
- int unflushThreshold =
- WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"),
- false, 1000, 0);
- int unflushInterval =
- WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"),
- false, 10000, 1);
- int unFlushDataHold =
- WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"),
- false, 0, 0);
- int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"),
- false, 10, 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"),
- false, 3, 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"),
- false, 20000, 4000);
- boolean acceptPublish =
- WebParameterUtils.validBooleanDataParameter("acceptPublish",
- req.getParameter("acceptPublish"),
- false, true);
- boolean acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- req.getParameter("acceptSubscribe"),
- false, true);
- int manageStatus = TStatusConstants.STATUS_MANAGE_APPLY;
- int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"), false, 1, 1);
- int brokerTlsPort =
- WebParameterUtils.validIntDataParameter("brokerTLSPort",
- req.getParameter("brokerTLSPort"), false,
- TBaseConstants.META_DEFAULT_BROKER_TLS_PORT, 0);
- String attributes =
- strBuffer.append(TStoreConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
- .append(TokenConstants.EQ).append(unFlushDataHold)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
- .append(TokenConstants.EQ).append(memCacheMsgCntInK)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
- .append(TokenConstants.EQ).append(memCacheMsgSizeInMB).append(TokenConstants.SEGMENT_SEP)
- .append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
- .append(TokenConstants.EQ).append(memCacheFlushIntvl)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_TLS_PORT)
- .append(TokenConstants.EQ).append(brokerTlsPort).toString();
- strBuffer.delete(0, strBuffer.length());
- BdbBrokerConfEntity brokerConfEntity =
- new BdbBrokerConfEntity(brokerId, brokerIp, brokerPort,
- numPartitions, unflushThreshold, unflushInterval,
- deleteWhen, deletePolicy, manageStatus, acceptPublish,
- acceptSubscribe, attributes, true, false, createUser,
- createDate, modifyUser, modifyDate);
- brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Add default config to brokers in batch
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminBatchAddBrokerDefConfEntityInfo(HttpServletRequest req) throws Exception {
- // #lizard forgives
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser", req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate", req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- List<Map<String, String>> brokerJsonArray =
- WebParameterUtils.checkAndGetJsonArray("brokerJsonSet",
- req.getParameter("brokerJsonSet"), TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((brokerJsonArray == null) || (brokerJsonArray.isEmpty())) {
- throw new Exception("Null value of brokerJsonSet, please set the value first!");
- }
- HashMap<String, BdbBrokerConfEntity> inBrokerConfEntityMap = new HashMap<>();
- ConcurrentHashMap<Integer, BdbBrokerConfEntity> bdbBrokerConfEntityMap =
- brokerConfManager.getBrokerConfStoreMap();
- for (int count = 0; count < brokerJsonArray.size(); count++) {
- Map<String, String> jsonObject = brokerJsonArray.get(count);
- try {
- String brokerIp =
- WebParameterUtils.checkParamCommonRequires("brokerIp",
- jsonObject.get("brokerIp"), false);
- int brokerPort =
- WebParameterUtils.validIntDataParameter("brokerPort",
- jsonObject.get("brokerPort"), false, 8123, 0);
- int brokerId =
- WebParameterUtils.validIntDataParameter("brokerId",
- jsonObject.get("brokerId"), false, 0, 0);
- if (brokerId <= 0) {
- brokerIp =
- WebParameterUtils.checkParamCommonRequires("brokerIp",
- jsonObject.get("brokerIp"), true);
- try {
- brokerId = abs(AddressUtils.ipToInt(brokerIp));
- } catch (Exception e) {
- throw new Exception(strBuffer
- .append("Get brokerId by brokerIp error !, record is : ")
- .append(jsonObject.toString()).append("exception is :").append(e.toString())
- .toString());
- }
- }
- BdbBrokerConfEntity oldEntity =
- bdbBrokerConfEntityMap.get(brokerId);
- if (oldEntity != null) {
- throw new Exception(strBuffer
- .append("Duplicated broker default configure record by (brokerId or brokerIp), " +
- "query index is :")
- .append("brokerId=").append(brokerId).append(",brokerIp=").append(brokerIp).toString());
- }
- for (BdbBrokerConfEntity brokerConfEntity : bdbBrokerConfEntityMap.values()) {
- if (brokerConfEntity.getBrokerIp().equals(brokerIp)
- && brokerConfEntity.getBrokerPort() == brokerPort) {
- strBuffer.append(
- "Duplicate add broker default configure record by (brokerIp and brokerPort), " +
- "query index is :")
- .append("brokerIp=").append(brokerIp).append(",brokerPort=").append(brokerPort)
- .append(",existed record is ").append(brokerConfEntity);
- throw new Exception(strBuffer.toString());
- }
- }
- for (BdbBrokerConfEntity brokerConfEntity : inBrokerConfEntityMap.values()) {
- if (brokerConfEntity.getBrokerIp().equals(brokerIp)
- && brokerConfEntity.getBrokerPort() == brokerPort) {
- throw new Exception(strBuffer
- .append("Duplicate (brokerIp and brokerPort) in request records, " +
- "duplicated key is : ")
- .append("brokerIp=").append(brokerIp).append(",brokerPort=").append(brokerPort)
- .toString());
- }
- if (brokerConfEntity.getBrokerId() == brokerId) {
- throw new Exception(strBuffer
- .append("Duplicate brokerId in request records, duplicated brokerId is : brokerId=")
- .append(brokerId).toString());
- }
- }
- final String inputKey = strBuffer.append(brokerId).append("-")
- .append(brokerIp).append("-").append(brokerPort).toString();
- strBuffer.delete(0, strBuffer.length());
- final String deleteWhen =
- WebParameterUtils.validDecodeStringParameter("deleteWhen", jsonObject.get("deleteWhen"),
- TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, "0 0 6,18 * * ?");
- final String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- jsonObject.get("deletePolicy"), false, "delete,168h");
- final int numPartitions =
- WebParameterUtils.validIntDataParameter("numPartitions",
- jsonObject.get("numPartitions"), false, 1, 1);
- final int unflushThreshold =
- WebParameterUtils.validIntDataParameter("unflushThreshold",
- jsonObject.get("unflushThreshold"), false, 1000, 0);
- final int unflushInterval =
- WebParameterUtils.validIntDataParameter("unflushInterval",
- jsonObject.get("unflushInterval"), false, 10000, 1);
- final int unFlushDataHold =
- WebParameterUtils.validIntDataParameter("unflushDataHold",
- jsonObject.get("unflushDataHold"), false, 0, 0);
- final boolean acceptPublish =
- WebParameterUtils.validBooleanDataParameter("acceptPublish",
- jsonObject.get("acceptPublish"), false, true);
- final boolean acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- jsonObject.get("acceptSubscribe"), false, true);
- String itemCreateUser =
- WebParameterUtils.validStringParameter("createUser", jsonObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, false, null);
- Date itemCreateDate =
- WebParameterUtils.validDateParameter("createDate", jsonObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, null);
- if (TStringUtils.isBlank(itemCreateUser) || itemCreateDate == null) {
- itemCreateUser = createUser;
- itemCreateDate = createDate;
- }
- int brokerTlsPort =
- WebParameterUtils.validIntDataParameter("brokerTLSPort",
- jsonObject.get("brokerTLSPort"), false,
- TBaseConstants.META_DEFAULT_BROKER_TLS_PORT, 0);
- int manageStatus = TStatusConstants.STATUS_MANAGE_APPLY;
- int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- jsonObject.get("numTopicStores"), false, 1, 1);
- int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- jsonObject.get("memCacheMsgCntInK"), false, 10, 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- jsonObject.get("memCacheMsgSizeInMB"), false, 3, 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- jsonObject.get("memCacheFlushIntvl"), false, 20000, 4000);
- String attributes = strBuffer
- .append(TStoreConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
- .append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
- .append(TokenConstants.EQ).append(unFlushDataHold).append(TokenConstants.SEGMENT_SEP)
- .append(TStoreConstants.TOKEN_MCACHE_MSG_CNT).append(TokenConstants.EQ)
- .append(memCacheMsgCntInK).append(TokenConstants.SEGMENT_SEP)
- .append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE).append(TokenConstants.EQ)
- .append(memCacheMsgSizeInMB).append(TokenConstants.SEGMENT_SEP)
- .append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL).append(TokenConstants.EQ)
- .append(memCacheFlushIntvl).append(TokenConstants.SEGMENT_SEP)
- .append(TStoreConstants.TOKEN_TLS_PORT).append(TokenConstants.EQ)
- .append(brokerTlsPort).toString();
- strBuffer.delete(0, strBuffer.length());
- inBrokerConfEntityMap.put(inputKey, new BdbBrokerConfEntity(brokerId, brokerIp,
- brokerPort, numPartitions, unflushThreshold, unflushInterval, deleteWhen,
- deletePolicy, manageStatus, acceptPublish, acceptSubscribe, attributes,
- true, false, itemCreateUser, itemCreateDate, itemCreateUser, itemCreateDate));
- } catch (Exception ee) {
- strBuffer.delete(0, strBuffer.length());
- throw new Exception(strBuffer.append("Process data exception, data is :")
- .append(jsonObject.toString()).append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- for (BdbBrokerConfEntity brokerConfEntity : inBrokerConfEntityMap.values()) {
- brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Make broker config online
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminOnlineBrokerConf(
- HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Set<BdbBrokerConfEntity> batchBrokerEntities =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- int manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE;
- Map<Integer, BrokerInfo> oldBrokerInfoMap =
- master.getBrokerHolder().getBrokerInfoMap();
- Set<BdbBrokerConfEntity> newBrokerEntitySet =
- new HashSet<>();
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (oldEntity == null) {
- continue;
- }
- if (oldEntity.getManageStatus() == manageStatus) {
- continue;
- }
- checkBrokerDuplicateRecord(oldEntity, strBuffer, oldBrokerInfoMap);
- if (WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- newBrokerEntitySet.add(new BdbBrokerConfEntity(oldEntity.getBrokerId(),
- oldEntity.getBrokerIp(), oldEntity.getBrokerPort(),
- oldEntity.getDftNumPartitions(), oldEntity.getDftUnflushThreshold(),
- oldEntity.getDftUnflushInterval(), oldEntity.getDftDeleteWhen(),
- oldEntity.getDftDeletePolicy(), manageStatus,
- oldEntity.isAcceptPublish(), oldEntity.isAcceptSubscribe(),
- oldEntity.getAttributes(), oldEntity.isConfDataUpdated(),
- oldEntity.isBrokerLoaded(), oldEntity.getRecordCreateUser(),
- oldEntity.getRecordCreateDate(), modifyUser, modifyDate));
- }
- for (BdbBrokerConfEntity newEntity : newBrokerEntitySet) {
- BdbBrokerConfEntity oldEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(newEntity.getBrokerId());
- if (oldEntity == null
- || oldEntity.getManageStatus() == newEntity.getManageStatus()
- || WebParameterUtils.checkBrokerInProcessing(newEntity.getBrokerId(), brokerConfManager,
- null)) {
- continue;
- }
- try {
- boolean isNeedFastStart =
- isBrokerStartNeedFast(brokerConfManager, newEntity.getBrokerId(),
- oldEntity.getManageStatus(), manageStatus);
- brokerConfManager.confModBrokerDefaultConfig(newEntity);
- brokerConfManager.triggerBrokerConfDataSync(newEntity,
- oldEntity.getManageStatus(), isNeedFastStart);
- } catch (Exception e2) {
- //
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Set read/write status of a broker.
- * The same operations could be made by changing broker's config,
- * but those are extracted here to simplify the code.
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminSetReadOrWriteBrokerConf(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- String strIsAcceptPublish = req.getParameter("isAcceptPublish");
- String strIsAcceptSubscribe = req.getParameter("isAcceptSubscribe");
- if ((TStringUtils.isBlank(strIsAcceptPublish))
- && (TStringUtils.isBlank(strIsAcceptSubscribe))) {
- throw new Exception("Required isAcceptPublish or isAcceptSubscribe parameter");
- }
- Set<BdbBrokerConfEntity> batchBrokerEntitySet = WebParameterUtils.getBatchBrokerIdSet(
- req.getParameter("brokerId"), brokerConfManager, true, strBuffer);
- Map<Integer, BrokerInfo> oldBrokerInfoMap =
- master.getBrokerHolder().getBrokerInfoMap();
-
- // Check the current status and status after the change, to see if there are changes.
- // If yes, check if the current status complies with the change.
- // If it complies, record the change.
- Set<BdbBrokerConfEntity> newBrokerEntitySet = new HashSet<>();
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntitySet) {
- if (oldEntity == null) {
- continue;
- }
- checkBrokerDuplicateRecord(oldEntity, strBuffer, oldBrokerInfoMap);
- boolean acceptPublish = false;
- boolean acceptSubscribe = false;
- if (oldEntity.getManageStatus() >= TStatusConstants.STATUS_MANAGE_ONLINE) {
- if (oldEntity.getManageStatus() == TStatusConstants.STATUS_MANAGE_ONLINE) {
- acceptPublish = true;
- acceptSubscribe = true;
- } else if (oldEntity.getManageStatus() == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- acceptPublish = false;
- acceptSubscribe = true;
- } else if (oldEntity.getManageStatus() == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- acceptPublish = true;
- acceptSubscribe = false;
- }
- }
- if (TStringUtils.isNotBlank(strIsAcceptPublish)) {
- acceptPublish =
- WebParameterUtils.validBooleanDataParameter("isAcceptPublish",
- req.getParameter("isAcceptPublish"), false, true);
- }
- if (TStringUtils.isNotBlank(strIsAcceptSubscribe)) {
- acceptSubscribe =
- WebParameterUtils.validBooleanDataParameter("isAcceptSubscribe",
- req.getParameter("isAcceptSubscribe"), false, true);
- }
- int manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ;
- if ((acceptPublish) && (acceptSubscribe)) {
- manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE;
- } else if (!acceptPublish && !acceptSubscribe) {
- if (oldEntity.getManageStatus() < TStatusConstants.STATUS_MANAGE_ONLINE) {
- throw new Exception(strBuffer.append("Broker by brokerId=")
- .append(oldEntity.getBrokerId())
- .append(" on draft status, not need offline operate!").toString());
- }
- manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
- } else if (acceptSubscribe) {
- manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE;
- }
- if ((manageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)
- && (oldEntity.getManageStatus() < TStatusConstants.STATUS_MANAGE_ONLINE)) {
- continue;
- }
- if (oldEntity.getManageStatus() == manageStatus) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- newBrokerEntitySet.add(new BdbBrokerConfEntity(oldEntity.getBrokerId(),
- oldEntity.getBrokerIp(), oldEntity.getBrokerPort(),
- oldEntity.getDftNumPartitions(), oldEntity.getDftUnflushThreshold(),
- oldEntity.getDftUnflushInterval(), oldEntity.getDftDeleteWhen(),
- oldEntity.getDftDeletePolicy(), manageStatus,
- oldEntity.isAcceptPublish(), oldEntity.isAcceptSubscribe(),
- oldEntity.getAttributes(), oldEntity.isConfDataUpdated(),
- oldEntity.isBrokerLoaded(), oldEntity.getRecordCreateUser(),
- oldEntity.getRecordCreateDate(), modifyUser, modifyDate));
- }
-
- // Perform the change on status
- for (BdbBrokerConfEntity newEntity : newBrokerEntitySet) {
- BdbBrokerConfEntity oldEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(newEntity.getBrokerId());
- if (oldEntity == null
- || oldEntity.getManageStatus() == newEntity.getManageStatus()
- || WebParameterUtils.checkBrokerInProcessing(newEntity.getBrokerId(), brokerConfManager,
- null)) {
- continue;
- }
- try {
- boolean isNeedFastStart =
- isBrokerStartNeedFast(brokerConfManager, newEntity.getBrokerId(),
- oldEntity.getManageStatus(), newEntity.getManageStatus());
- brokerConfManager.confModBrokerDefaultConfig(newEntity);
- brokerConfManager.triggerBrokerConfDataSync(newEntity,
- oldEntity.getManageStatus(), isNeedFastStart);
- } catch (Exception e2) {
- //
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * @param oldEntity
- * @param strBuffer
- * @param oldBrokerInfoMap
- * @throws Exception
- */
- private void checkBrokerDuplicateRecord(BdbBrokerConfEntity oldEntity, StringBuilder strBuffer,
- Map<Integer, BrokerInfo> oldBrokerInfoMap) throws Exception {
- if (oldEntity.getManageStatus() == TStatusConstants.STATUS_MANAGE_APPLY) {
- BrokerInfo tmpBrokerInfo = oldBrokerInfoMap.get(oldEntity.getBrokerId());
- if (tmpBrokerInfo != null) {
- throw new Exception(strBuffer
- .append("Illegal value: the brokerId (")
- .append(oldEntity.getBrokerId())
- .append(") is used by another broker, please quit the broker first! " +
- "Using the brokerId's brokerIp=")
- .append(tmpBrokerInfo.getHost()).toString());
- }
- for (BrokerInfo oldBrokerInfo : oldBrokerInfoMap.values()) {
- if (oldBrokerInfo.getHost().equals(oldEntity.getBrokerIp())) {
- throw new Exception(strBuffer
- .append("Illegal value: the brokerId's Ip is used by another broker, " +
- "please quit the broker first! BrokerIp=")
- .append(oldEntity.getBrokerIp())
- .append(",current brokerId=")
- .append(oldEntity.getBrokerId())
- .append(",using the brokerIp's brokerId=")
- .append(oldBrokerInfo.getBrokerId()).toString());
- }
- }
- }
- }
-
- /**
- * Release broker auto forbidden status
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminRelBrokerAutoForbiddenStatus(
- HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Set<Integer> batchBrokerIds = new HashSet<>();
- Set<BdbBrokerConfEntity> batchBrokerEntitySet = WebParameterUtils.getBatchBrokerIdSet(
- req.getParameter("brokerId"), brokerConfManager, true, strBuffer);
- for (BdbBrokerConfEntity entity : batchBrokerEntitySet) {
- batchBrokerIds.add(entity.getBrokerId());
- }
- String relReason =
- WebParameterUtils.validStringParameter("relReason",
- req.getParameter("relReason"),
- TBaseConstants.META_MAX_OPREASON_LENGTH,
- false, "API call to release auto-forbidden brokers");
- BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
- brokerInfoHolder.relAutoForbiddenBrokerInfo(batchBrokerIds, relReason);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Update broker default config.
- * The current record will be checked firstly. The update will be performed only when there are changes.
- *
- * @param req
- * @return
- * @throws Throwable
- */
- // #lizard forgives
- public StringBuilder adminUpdateBrokerConf(HttpServletRequest req) throws Throwable {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser = WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"), TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Date modifyDate = WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"), TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- Set<BdbBrokerConfEntity> batchBrokerEntitySet = WebParameterUtils.getBatchBrokerIdSet(
- req.getParameter("brokerId"), brokerConfManager, true, strBuffer);
- Set<BdbBrokerConfEntity> modifyBdbEntitySet = new HashSet<>();
-
- // Check the entities one by one, to see if there are changes.
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntitySet) {
- if (oldEntity == null) {
- continue;
- }
- boolean foundChange = false;
- BdbBrokerConfEntity newEntity =
- new BdbBrokerConfEntity(oldEntity.getBrokerId(), oldEntity.getBrokerIp(),
- oldEntity.getBrokerPort(), oldEntity.getDftNumPartitions(),
- oldEntity.getDftUnflushThreshold(), oldEntity.getDftUnflushInterval(),
- oldEntity.getDftDeleteWhen(), oldEntity.getDftDeletePolicy(),
- oldEntity.getManageStatus(), oldEntity.isAcceptPublish(),
- oldEntity.isAcceptSubscribe(), oldEntity.getAttributes(), oldEntity.isConfDataUpdated(),
- oldEntity.isBrokerLoaded(),
- oldEntity.getRecordCreateUser(), oldEntity.getRecordCreateDate(),
- modifyUser, modifyDate);
- String deleteWhen = WebParameterUtils.validDecodeStringParameter("deleteWhen",
- req.getParameter("deleteWhen"), TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, null);
- if ((!TStringUtils.isBlank(deleteWhen)) && (!deleteWhen.equals(oldEntity.getDftDeleteWhen()))) {
- foundChange = true;
- newEntity.setDftDeleteWhen(deleteWhen);
- }
- int brokerPort = WebParameterUtils.validIntDataParameter("brokerPort",
- req.getParameter("brokerPort"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- if ((brokerPort != TBaseConstants.META_VALUE_UNDEFINED)
- && (oldEntity.getBrokerPort() != brokerPort)) {
- foundChange = true;
- newEntity.setBrokerIpAndPort(oldEntity.getBrokerIp(), brokerPort);
- }
- String deletePolicy =
- WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, null);
- if ((!TStringUtils.isBlank(deletePolicy)) && (!deletePolicy.equals(oldEntity.getDftDeletePolicy()))) {
- foundChange = true;
- newEntity.setDftDeletePolicy(deletePolicy);
- }
- int numPartitions = WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- if ((numPartitions > 0) && (numPartitions != oldEntity.getDftNumPartitions())) {
- foundChange = true;
- newEntity.setDftNumPartitions(numPartitions);
- }
- int unflushThreshold = WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- if ((unflushThreshold >= 0) && (unflushThreshold != oldEntity.getDftUnflushThreshold())) {
- foundChange = true;
- newEntity.setDftUnflushThreshold(unflushThreshold);
- }
- int unflushInterval = WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- if ((unflushInterval > 0) && (unflushInterval != oldEntity.getDftUnflushInterval())) {
- foundChange = true;
- newEntity.setDftUnflushInterval(unflushInterval);
- }
- int numTopicStores = WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- if ((numTopicStores > 0) && (numTopicStores != oldEntity.getNumTopicStores())) {
- foundChange = true;
- newEntity.appendAttributes(TStoreConstants.TOKEN_STORE_NUM, String.valueOf(numTopicStores));
- }
- int unFlushDataHold = WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- if ((unFlushDataHold >= 0) && (unFlushDataHold != oldEntity.getDftUnFlushDataHold())) {
- foundChange = true;
- newEntity.setDftUnFlushDataHold(unFlushDataHold);
- }
- int brokerTlsPort = WebParameterUtils.validIntDataParameter("brokerTLSPort",
- req.getParameter("brokerTLSPort"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- if (brokerTlsPort >= 0 && brokerTlsPort != oldEntity.getBrokerTLSPort()) {
- foundChange = true;
- newEntity.setBrokerTLSPort(brokerTlsPort);
- }
- if (unFlushDataHold >= 0 && unFlushDataHold != oldEntity.getDftUnFlushDataHold()) {
- foundChange = true;
- newEntity.setDftUnFlushDataHold(unFlushDataHold);
- }
- int memCacheMsgCntInK = WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- if ((memCacheMsgCntInK > 0) && (memCacheMsgCntInK != oldEntity.getDftMemCacheMsgCntInK())) {
- foundChange = true;
- newEntity.setDftMemCacheMsgCntInK(memCacheMsgCntInK);
- }
- int memCacheMsgSizeInMB = WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"), false, TBaseConstants.META_VALUE_UNDEFINED, 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
- if ((memCacheMsgSizeInMB > 0) && (memCacheMsgSizeInMB != oldEntity.getDftMemCacheMsgSizeInMB())) {
- foundChange = true;
- newEntity.setDftMemCacheMsgSizeInMB(memCacheMsgSizeInMB);
- }
- int memCacheFlushIntvl = WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000);
- if ((memCacheFlushIntvl > 0) && (memCacheFlushIntvl != oldEntity.getDftMemCacheFlushIntvl())) {
- foundChange = true;
- newEntity.setDftMemCacheFlushIntvl(memCacheFlushIntvl);
- newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
- String.valueOf(memCacheFlushIntvl));
- }
- String publishParaStr = req.getParameter("acceptPublish");
- if (!TStringUtils.isBlank(publishParaStr)) {
- boolean acceptPublish = WebParameterUtils.validBooleanDataParameter("acceptPublish",
- req.getParameter("acceptPublish"), true, true);
- if (acceptPublish != oldEntity.isAcceptPublish()) {
- foundChange = true;
- newEntity.setDftAcceptPublish(acceptPublish);
- }
- }
- String subscribeParaStr = req.getParameter("acceptSubscribe");
- if (!TStringUtils.isBlank(subscribeParaStr)) {
- boolean acceptSubscribe = WebParameterUtils.validBooleanDataParameter("acceptSubscribe",
- req.getParameter("acceptSubscribe"), true, true);
- if (acceptSubscribe != oldEntity.isAcceptSubscribe()) {
- foundChange = true;
- newEntity.setDftAcceptSubscribe(acceptSubscribe);
- }
- }
- if (!foundChange) {
- continue;
- }
- newEntity.setConfDataUpdated();
- modifyBdbEntitySet.add(newEntity);
- }
- try {
- // Perform the updates only on those which are changed
- boolean result = false;
- for (BdbBrokerConfEntity itemEntity : modifyBdbEntitySet) {
- result = brokerConfManager.confModBrokerDefaultConfig(itemEntity);
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerConfManager.getBrokerRunSyncStatusInfo(itemEntity.getBrokerId());
- if (result) {
- if (brokerSyncStatusInfo != null) {
- brokerConfManager.updateBrokerConfChanged(itemEntity.getBrokerId(), true, true);
- }
- }
- }
- } catch (Exception ee) {
- //
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Reload broker config
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminReloadBrokerConf(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Set<BdbBrokerConfEntity> batchBrokerEntities =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (oldEntity == null) {
- continue;
- }
- if (!WebParameterUtils.checkBrokerInOnlineStatus(oldEntity)) {
- strBuffer.append("The broker manage status by brokerId=")
- .append(oldEntity.getBrokerId())
- .append(" not in online status, can't reload this configure! ");
- throw new Exception(strBuffer.toString());
- }
- if (WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- }
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (!WebParameterUtils.checkBrokerInOnlineStatus(oldEntity) || WebParameterUtils
- .checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, null)) {
- continue;
- }
- try {
- boolean isNeedFastStart =
- isBrokerStartNeedFast(brokerConfManager, oldEntity.getBrokerId(),
- oldEntity.getManageStatus(), oldEntity.getManageStatus());
- brokerConfManager.triggerBrokerConfDataSync(oldEntity,
- oldEntity.getManageStatus(), isNeedFastStart);
- } catch (Exception ee) {
- //
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Make broker config offline
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminOfflineBrokerConf(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
- Set<BdbBrokerConfEntity> batchBrokerEntities =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- Set<BdbBrokerConfEntity> newBrokerEntitys =
- new HashSet<>();
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (oldEntity == null) {
- continue;
- }
- if (oldEntity.getManageStatus() < TStatusConstants.STATUS_MANAGE_ONLINE) {
- throw new Exception(strBuffer.append("Broker by brokerId=")
- .append(oldEntity.getBrokerId())
- .append(" on draft status, not need offline operate!").toString());
- }
- if (oldEntity.getManageStatus() == manageStatus) {
- continue;
- }
- if (WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- newBrokerEntitys.add(new BdbBrokerConfEntity(oldEntity.getBrokerId(),
- oldEntity.getBrokerIp(), oldEntity.getBrokerPort(),
- oldEntity.getDftNumPartitions(), oldEntity.getDftUnflushThreshold(),
- oldEntity.getDftUnflushInterval(), oldEntity.getDftDeleteWhen(),
- oldEntity.getDftDeletePolicy(), manageStatus,
- oldEntity.isAcceptPublish(), oldEntity.isAcceptSubscribe(),
- oldEntity.getAttributes(), oldEntity.isConfDataUpdated(),
- oldEntity.isBrokerLoaded(), oldEntity.getRecordCreateUser(),
- oldEntity.getRecordCreateDate(), modifyUser, modifyDate));
- }
- for (BdbBrokerConfEntity newEntity : newBrokerEntitys) {
- BdbBrokerConfEntity oldEntity =
- brokerConfManager.getBrokerDefaultConfigStoreInfo(newEntity.getBrokerId());
- if (oldEntity == null
- || oldEntity.getManageStatus() == manageStatus
- || oldEntity.getManageStatus() < TStatusConstants.STATUS_MANAGE_ONLINE
- || WebParameterUtils.checkBrokerInProcessing(oldEntity.getBrokerId(), brokerConfManager,
- null)) {
- continue;
- }
- try {
- boolean isNeedFastStart =
- isBrokerStartNeedFast(brokerConfManager, oldEntity.getBrokerId(),
- oldEntity.getManageStatus(), oldEntity.getManageStatus());
- brokerConfManager.confModBrokerDefaultConfig(newEntity);
- brokerConfManager.triggerBrokerConfDataSync(newEntity, oldEntity.getManageStatus(),
- isNeedFastStart);
- } catch (Exception ee) {
- //
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Delete broker config
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminDeleteBrokerConfEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- boolean isReservedData =
- WebParameterUtils.validBooleanDataParameter("isReservedData",
- req.getParameter("isReservedData"),
- false, false);
- Set<BdbBrokerConfEntity> batchBrokerEntities =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
- brokerConfManager, true, strBuffer);
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (oldEntity == null) {
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(oldEntity.getBrokerId());
- if ((brokerTopicEntityMap != null)
- && (!brokerTopicEntityMap.isEmpty())) {
- if (isReservedData) {
- for (Map.Entry<String, BdbTopicConfEntity> entry : brokerTopicEntityMap.entrySet()) {
- if (entry.getValue() == null) {
- continue;
- }
- if (entry.getValue().getAcceptPublish()
- || entry.getValue().getAcceptSubscribe()) {
- throw new Exception(strBuffer.append("The topic ")
- .append(entry.getKey())
- .append("'s acceptPublish and acceptSubscribe parameters must be false " +
- "in broker=")
- .append(oldEntity.getBrokerId())
- .append(" before broker delete by reserve data method!").toString());
- }
- }
- } else {
- throw new Exception(strBuffer.append("Topic configure of broker by brokerId=")
- .append(oldEntity.getBrokerId())
- .append(" not deleted, please delete broker's topic configure first!").toString());
- }
- }
- if (WebParameterUtils.checkBrokerInOnlineStatus(oldEntity)) {
- throw new Exception(strBuffer.append("Broker's manage status is online by brokerId=")
- .append(oldEntity.getBrokerId())
- .append(", please offline first!").toString());
- }
- if (WebParameterUtils.checkBrokerInOfflining(oldEntity.getBrokerId(),
- oldEntity.getManageStatus(), brokerConfManager, strBuffer)) {
- throw new Exception(strBuffer.toString());
- }
- }
- for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
- if (oldEntity == null
- || WebParameterUtils.checkBrokerInOnlineStatus(oldEntity)
- || WebParameterUtils.checkBrokerInOfflining(oldEntity.getBrokerId(),
- oldEntity.getManageStatus(), brokerConfManager, null)) {
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(oldEntity.getBrokerId());
- if ((brokerTopicEntityMap != null)
- && (!brokerTopicEntityMap.isEmpty())) {
- if (isReservedData) {
- boolean needCancel = false;
- for (Map.Entry<String, BdbTopicConfEntity> btEntity : brokerTopicEntityMap.entrySet()) {
- if (btEntity.getValue() == null) {
- continue;
- }
- if ((btEntity.getValue().getAcceptPublish())
- && (btEntity.getValue().getAcceptSubscribe())) {
- needCancel = true;
- break;
- }
- }
- if (needCancel) {
- continue;
- }
- } else {
- continue;
- }
- }
- try {
- if (isReservedData) {
- ConcurrentHashMap<String, BdbTopicConfEntity> brokerTopicConfMap =
- brokerConfManager.getBrokerTopicConfEntitySet(oldEntity.getBrokerId());
- if (brokerTopicConfMap != null) {
- brokerConfManager.clearConfigureTopicEntityInfo(oldEntity.getBrokerId());
- }
- }
- brokerConfManager.confDelBrokerConfig(oldEntity);
- } catch (Exception ee) {
- //
- }
- }
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return strBuffer;
- }
-
- /**
- * Query run status of broker
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminQueryBrokerRunStatusInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- try {
- BdbBrokerConfEntity brokerConfEntity = new BdbBrokerConfEntity();
- brokerConfEntity.setDftUnFlushDataHold(TBaseConstants.META_VALUE_UNDEFINED);
- boolean withDetail =
- WebParameterUtils.validBooleanDataParameter("withDetail",
- req.getParameter("withDetail"), false, false);
- Set<String> batchBrokerIps =
- WebParameterUtils.getBatchBrokerIpSet(req.getParameter("brokerIp"), false);
- Set<Integer> batchBrokerIds =
- WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), false);
- boolean onlyAbnormal =
- WebParameterUtils.validBooleanDataParameter("onlyAbnormal",
- req.getParameter("onlyAbnormal"), false, false);
- boolean onlyAutoForbidden =
- WebParameterUtils.validBooleanDataParameter("onlyAutoForbidden",
- req.getParameter("onlyAutoForbidden"), false, false);
- boolean onlyEnableTLS =
- WebParameterUtils.validBooleanDataParameter("onlyEnableTLS",
- req.getParameter("onlyEnableTLS"), false, false);
- int count = 0;
- List<BdbBrokerConfEntity> brokerConfEntityList =
- brokerConfManager.confGetBdbBrokerEntitySet(brokerConfEntity);
- BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
- Map<Integer, BrokerInfoHolder.BrokerAbnInfo> brokerAbnInfoMap =
- brokerInfoHolder.getBrokerAbnormalMap();
- Map<Integer, BrokerInfoHolder.BrokerFbdInfo> brokerFbdInfoMap =
- brokerInfoHolder.getAutoForbiddenBrokerMapInfo();
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- for (BdbBrokerConfEntity entity : brokerConfEntityList) {
- if (((!batchBrokerIds.isEmpty()) && (!batchBrokerIds.contains(entity.getBrokerId())))
- || ((!batchBrokerIps.isEmpty()) && (!batchBrokerIps.contains(entity.getBrokerIp())))) {
- continue;
- }
- BrokerInfoHolder.BrokerAbnInfo brokerAbnInfo =
- brokerAbnInfoMap.get(entity.getBrokerId());
- if (onlyAbnormal && brokerAbnInfo == null) {
- continue;
- }
- BrokerInfoHolder.BrokerFbdInfo brokerForbInfo =
- brokerFbdInfoMap.get(entity.getBrokerId());
- if (onlyAutoForbidden && brokerForbInfo == null) {
- continue;
- }
- BrokerInfo brokerInfo = brokerInfoHolder.getBrokerInfo(entity.getBrokerId());
- if (onlyEnableTLS && (brokerInfo == null || !brokerInfo.isEnableTLS())) {
- continue;
- }
- if (count++ > 0) {
- strBuffer.append(",");
- }
- int brokerManageStatus = entity.getManageStatus();
- String strManageStatus = WebParameterUtils.getBrokerManageStatusStr(brokerManageStatus);
- strBuffer.append("{\"brokerId\":").append(entity.getBrokerId())
- .append(",\"brokerIp\":\"").append(entity.getBrokerIp())
- .append("\",\"brokerPort\":").append(entity.getBrokerPort())
- .append(",\"manageStatus\":\"").append(strManageStatus).append("\"");
- if (brokerInfo == null) {
- strBuffer.append(",\"brokerTLSPort\":").append(entity.getBrokerTLSPort())
- .append(",\"enableTLS\":\"-\"");
- } else {
- strBuffer.append(",\"brokerTLSPort\":").append(entity.getBrokerTLSPort())
- .append(",\"enableTLS\":").append(brokerInfo.isEnableTLS());
- }
- if (brokerAbnInfo == null) {
- strBuffer.append(",\"isRepAbnormal\":false");
- } else {
- strBuffer.append(",\"isRepAbnormal\":true,\"repStatus\":")
- .append(brokerAbnInfo.getAbnStatus());
- }
- if (brokerForbInfo == null) {
- strBuffer.append(",\"isAutoForbidden\":false");
- } else {
- strBuffer.append(",\"isAutoForbidden\":true");
- }
- if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_APPLY) {
- strBuffer.append(",\"runStatus\":\"-\",\"subStatus\":\"-\"")
- .append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
- .append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
- } else {
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerConfManager.getBrokerRunSyncStatusInfo(entity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- strBuffer.append(",\"runStatus\":\"unRegister\",\"subStatus\":\"-\"")
- .append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
- .append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
- } else {
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- int stepStatus = brokerSyncStatusInfo.getBrokerRunStatus();
- if (brokerSyncStatusInfo.isBrokerOnline()) {
- if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
- strBuffer.append(",\"runStatus\":\"running\",\"subStatus\":\"idle\"");
- } else {
- strBuffer.append(",\"runStatus\":\"running\",\"subStatus\":\"processing_event\"," +
- "\"stepOp\":")
- .append(stepStatus);
- }
- } else {
- if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
- strBuffer.append(",\"runStatus\":\"notRegister\",\"subStatus\":\"idle\"");
- } else {
- strBuffer.append(",\"runStatus\":\"notRegister\",\"subStatus\":\"processing_event\"," +
- "\"stepOp\":")
- .append(stepStatus);
- }
- }
- strBuffer.append(",\"isConfChanged\":\"").append(brokerSyncStatusInfo.isBrokerConfChanged())
- .append("\",\"isConfLoaded\":\"").append(brokerSyncStatusInfo.isBrokerLoaded())
- .append("\",\"isBrokerOnline\":\"").append(brokerSyncStatusInfo.isBrokerOnline())
- .append("\"");
- switch (brokerManageStatus) {
- case TStatusConstants.STATUS_MANAGE_ONLINE: {
- isAcceptPublish = false;
- isAcceptSubscribe = false;
- if (brokerSyncStatusInfo.isBrokerRegister()) {
- if (brokerSyncStatusInfo.isBrokerOnline()) {
- if ((stepStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_REGISTER)
- || (stepStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE)
- || (stepStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ)) {
- isAcceptPublish = false;
- isAcceptSubscribe = true;
- } else {
- isAcceptPublish = true;
- isAcceptSubscribe = true;
- }
- }
- }
- break;
- }
- case TStatusConstants.STATUS_MANAGE_OFFLINE: {
- isAcceptPublish = false;
- isAcceptSubscribe = false;
- if (brokerSyncStatusInfo.isBrokerRegister()) {
- if (brokerSyncStatusInfo.isBrokerOnline()) {
- if (stepStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE) {
- isAcceptPublish = false;
- isAcceptSubscribe = true;
- }
- }
- }
- break;
- }
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
- isAcceptPublish = false;
- isAcceptSubscribe = true;
- break;
- }
-
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ: {
- isAcceptPublish = true;
- isAcceptSubscribe = false;
- break;
- }
- default: {
- //
- }
- }
- strBuffer.append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"")
- .append(isAcceptPublish).append("\",\"acceptSubscribe\":\"")
- .append(isAcceptSubscribe).append("\"");
- if (withDetail) {
- strBuffer = brokerSyncStatusInfo.toJsonString(strBuffer.append(","), false);
- }
- }
- }
- strBuffer.append("}");
- }
- strBuffer.append("],\"count\":").append(count).append("}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return strBuffer;
- }
-
- /**
- * Query broker config
- *
- * @param req
- * @return
- * @throws Exception
- */
- // #lizard forgives
- public StringBuilder adminQueryBrokerDefConfEntityInfo(HttpServletRequest req) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
- BdbBrokerConfEntity brokerConfEntity = new BdbBrokerConfEntity();
- brokerConfEntity.setDftUnFlushDataHold(TBaseConstants.META_VALUE_UNDEFINED);
- try {
- brokerConfEntity
- .setRecordCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"), TBaseConstants.META_MAX_USERNAME_LENGTH, false, null));
- brokerConfEntity
- .setRecordModifyUser(WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"), TBaseConstants.META_MAX_USERNAME_LENGTH, false, null));
- brokerConfEntity
- .setDftDeleteWhen(WebParameterUtils.validDecodeStringParameter("deleteWhen",
- req.getParameter("deleteWhen"), TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, null));
- brokerConfEntity
- .setDftDeletePolicy(WebParameterUtils.validDeletePolicyParameter("deletePolicy",
- req.getParameter("deletePolicy"), false, null));
- brokerConfEntity
- .setDftNumPartitions(WebParameterUtils.validIntDataParameter("numPartitions",
- req.getParameter("numPartitions"), false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- brokerConfEntity
- .setDftUnflushInterval(WebParameterUtils.validIntDataParameter("unflushInterval",
- req.getParameter("unflushInterval"), false, TBaseConstants.META_VALUE_UNDEFINED, 1));
- brokerConfEntity
- .setDftUnflushThreshold(WebParameterUtils.validIntDataParameter("unflushThreshold",
- req.getParameter("unflushThreshold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0));
- brokerConfEntity
- .setDftUnFlushDataHold(WebParameterUtils.validIntDataParameter("unflushDataHold",
- req.getParameter("unflushDataHold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0));
- brokerConfEntity
- .setBrokerIp(WebParameterUtils.checkParamCommonRequires("brokerIp",
- req.getParameter("brokerIp"), false));
- boolean withTopic =
- WebParameterUtils.validBooleanDataParameter("withTopic",
- req.getParameter("withTopic"), false, false);
- int topicStatusId =
- WebParameterUtils.validIntDataParameter("topicStatusId",
- req.getParameter("topicStatusId"), false,
- TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
- int numTopicStores =
- WebParameterUtils.validIntDataParameter("numTopicStores",
- req.getParameter("numTopicStores"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int memCacheMsgCntInK =
- WebParameterUtils.validIntDataParameter("memCacheMsgCntInK",
- req.getParameter("memCacheMsgCntInK"), false, TBaseConstants.META_VALUE_UNDEFINED, 1);
- int memCacheMsgSizeInMB =
- WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
- req.getParameter("memCacheMsgSizeInMB"), false, TBaseConstants.META_VALUE_UNDEFINED, 2);
- int memCacheFlushIntvl =
- WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
- req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000);
- int brokerTlsPort =
- WebParameterUtils.validIntDataParameter("brokerTLSPort",
- req.getParameter("brokerTLSPort"), false, TBaseConstants.META_VALUE_UNDEFINED, 0);
- Boolean isInclude = null;
- Set<String> batchTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"), false, false, null, strBuffer);
- if (!batchTopicNames.isEmpty()) {
- isInclude =
- WebParameterUtils.validBooleanDataParameter("isInclude",
- req.getParameter("isInclude"), false, true);
- }
- Set<Integer> batchBrokerIds = WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), false);
- if (batchBrokerIds.size() == 1) {
- for (Integer brokerId : batchBrokerIds) {
- brokerConfEntity.setBrokerId(brokerId);
- }
- }
- int count = 0;
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- List<BdbBrokerConfEntity> brokerConfEntityList =
- brokerConfManager.confGetBdbBrokerEntitySet(brokerConfEntity);
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- for (BdbBrokerConfEntity entity : brokerConfEntityList) {
- int recordNumTopicStores = entity.getNumTopicStores();
- int recordMemCacheMsgCntInK = entity.getDftMemCacheMsgCntInK();
- int recordMemCacheMsgSizeInMB = entity.getDftMemCacheMsgSizeInMB();
- int recordMemCacheFlushIntvl = entity.getDftMemCacheFlushIntvl();
- int recordTLSPort = entity.getBrokerTLSPort();
- if (((!batchBrokerIds.isEmpty()) && (!batchBrokerIds.contains(entity.getBrokerId())))
- || ((numTopicStores >= 0) && (numTopicStores != recordNumTopicStores))
- || ((memCacheMsgCntInK >= 0) && (memCacheMsgCntInK != recordMemCacheMsgCntInK))
- || ((memCacheMsgSizeInMB >= 0) && (memCacheMsgSizeInMB != recordMemCacheMsgSizeInMB))
- || ((memCacheFlushIntvl >= 0) && (memCacheFlushIntvl != recordMemCacheFlushIntvl))
- || ((brokerTlsPort >= 0) && (brokerTlsPort != recordTLSPort))) {
- continue;
- }
- ConcurrentHashMap<String, BdbTopicConfEntity> bdbTopicConfEntityMap =
- brokerConfManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
- if (!isValidRecord(batchTopicNames, topicStatusId, isInclude, bdbTopicConfEntityMap)) {
- continue;
- }
- if (count++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"brokerId\":").append(entity.getBrokerId())
- .append(",\"brokerIp\":\"").append(entity.getBrokerIp())
- .append("\",\"brokerPort\":").append(entity.getBrokerPort())
- .append(",\"numPartitions\":").append(entity.getDftNumPartitions())
- .append(",\"numTopicStores\":").append(recordNumTopicStores)
- .append(",\"unflushThreshold\":").append(entity.getDftUnflushThreshold())
- .append(",\"unflushInterval\":").append(entity.getDftUnflushInterval())
- .append(",\"unflushDataHold\":").append(entity.getDftUnFlushDataHold())
- .append(",\"memCacheMsgCntInK\":").append(recordMemCacheMsgCntInK)
- .append(",\"memCacheMsgSizeInMB\":").append(recordMemCacheMsgSizeInMB)
- .append(",\"memCacheFlushIntvl\":").append(recordMemCacheFlushIntvl)
- .append(",\"deleteWhen\":\"").append(entity.getDftDeleteWhen())
- .append("\",\"deletePolicy\":\"").append(entity.getDftDeletePolicy())
- .append("\",\"acceptPublish\":").append(entity.isAcceptPublish())
- .append(",\"acceptSubscribe\":").append(entity.isAcceptSubscribe())
- .append(",\"createUser\":\"").append(entity.getRecordCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getRecordCreateDate()))
- .append("\",\"modifyUser\":\"").append(entity.getRecordModifyUser())
- .append("\",\"modifyDate\":\"").append(formatter.format(entity.getRecordModifyDate()))
- .append("\"");
- if (recordTLSPort >= 0) {
- strBuffer.append(",\"hasTLSPort\":true,\"brokerTLSPort\":").append(recordTLSPort);
- } else {
- strBuffer.append(",\"hasTLSPort\":false");
- }
- strBuffer = addTopicInfo(withTopic, strBuffer, formatter, bdbTopicConfEntityMap);
- strBuffer.append("}");
- }
- strBuffer.append("],\"count\":").append(count).append("}");
- } catch (Exception e) {
- strBuffer.delete(0, strBuffer.length());
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
- }
- return strBuffer;
- }
-
- /**
- * Check if the record is valid
- *
- * @param batchTopicNames
- * @param topicStatusId
- * @param isInclude
- * @param bdbTopicConfEntityMap
- * @return
- */
- private boolean isValidRecord(final Set<String> batchTopicNames, int topicStatusId, Boolean isInclude,
- ConcurrentHashMap<String, BdbTopicConfEntity> bdbTopicConfEntityMap) {
- // First check the difference between specified topic and request topic, and then when the broker
- // has a topic record, filter according to the topic requirements specified by the business
- if (!batchTopicNames.isEmpty() && isInclude != null) {
- if ((bdbTopicConfEntityMap == null) || (bdbTopicConfEntityMap.isEmpty())) {
- if (isInclude) {
- return false;
- }
- } else {
- boolean filterInclude = false;
- Set<String> curTopics = bdbTopicConfEntityMap.keySet();
- if (isInclude) {
- for (String inTopic : batchTopicNames) {
- if (curTopics.contains(inTopic)) {
- filterInclude = true;
- break;
- }
- }
- } else {
- filterInclude = true;
- for (String inTopic : batchTopicNames) {
- if (curTopics.contains(inTopic)) {
- filterInclude = false;
- break;
- }
- }
- }
- if (!filterInclude) {
- return false;
- }
- }
- }
- // Filter according to the topic status
- if (topicStatusId == TBaseConstants.META_VALUE_UNDEFINED) {
- return true;
- } else {
- if ((bdbTopicConfEntityMap == null) || (bdbTopicConfEntityMap.isEmpty())) {
- return false;
- }
- for (BdbTopicConfEntity bdbTopicConfEntity : bdbTopicConfEntityMap.values()) {
- if (bdbTopicConfEntity.getTopicStatusId() == topicStatusId) {
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * Private method to add topic info
- *
- * @param withTopic
- * @param sb
- * @param formatter
- * @param bdbTopicConfEntityMap
- * @return
- */
- private StringBuilder addTopicInfo(boolean withTopic, StringBuilder sb, final SimpleDateFormat formatter,
- ConcurrentHashMap<String, BdbTopicConfEntity> bdbTopicConfEntityMap) {
- if (withTopic) {
- sb.append(",\"topicSet\":[");
- int topicCount = 0;
- if (bdbTopicConfEntityMap != null) {
- for (BdbTopicConfEntity topicEntity : bdbTopicConfEntityMap.values()) {
- if (topicCount++ > 0) {
- sb.append(",");
- }
- sb.append("{\"topicName\":\"").append(topicEntity.getTopicName())
- .append("\",\"topicStatusId\":").append(topicEntity.getTopicStatusId())
- .append(",\"brokerId\":").append(topicEntity.getBrokerId())
- .append(",\"brokerIp\":\"").append(topicEntity.getBrokerIp())
- .append("\",\"brokerPort\":").append(topicEntity.getBrokerPort())
- .append(",\"numTopicStores\":").append(topicEntity.getNumTopicStores())
- .append(",\"numPartitions\":").append(topicEntity.getNumPartitions())
- .append(",\"unflushThreshold\":").append(topicEntity.getUnflushThreshold())
- .append(",\"unflushInterval\":").append(topicEntity.getUnflushInterval())
- .append(",\"unflushDataHold\":").append(topicEntity.getUnflushDataHold())
- .append(",\"memCacheMsgCntInK\":").append(topicEntity.getMemCacheMsgCntInK())
- .append(",\"memCacheMsgSizeInMB\":").append(topicEntity.getMemCacheMsgSizeInMB())
- .append(",\"memCacheFlushIntvl\":").append(topicEntity.getMemCacheFlushIntvl())
- .append(",\"deleteWhen\":\"").append(topicEntity.getDeleteWhen())
- .append("\",\"deletePolicy\":\"").append(topicEntity.getDeletePolicy())
- .append("\",\"acceptPublish\":").append(topicEntity.getAcceptPublish())
- .append(",\"acceptSubscribe\":").append(topicEntity.getAcceptSubscribe())
- .append(",\"createUser\":\"").append(topicEntity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(topicEntity.getCreateDate()))
- .append("\",\"modifyUser\":\"").append(topicEntity.getModifyUser())
- .append("\",\"modifyDate\":\"").append(formatter.format(topicEntity.getModifyDate()))
- .append("\"}");
- }
- }
- sb.append("]");
- }
- return sb;
- }
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 06daeb0..05c2f13 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -65,9 +65,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryGroupConsumeCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -142,8 +142,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddGroupConsumeCtrlInfo(HttpServletRequest req) {
- return innAddOrUpdGroupConsumeCtrlInfo(req, true);
+ public StringBuilder adminAddGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupConsumeCtrlInfo(req, sBuffer, result, true);
}
/**
@@ -152,8 +154,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddGroupConsumeCtrlInfo(HttpServletRequest req) {
- return innBatchAddOrUpdGroupConsumeCtrlInfo(req, true);
+ public StringBuilder adminBatchAddGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupConsumeCtrlInfo(req, sBuffer, result, true);
}
/**
@@ -162,8 +166,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModGroupConsumeCtrlInfo(HttpServletRequest req) {
- return innAddOrUpdGroupConsumeCtrlInfo(req, false);
+ public StringBuilder adminModGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupConsumeCtrlInfo(req, sBuffer, result, false);
}
/**
@@ -172,8 +178,10 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchModGroupConsumeCtrlInfo(HttpServletRequest req) {
- return innBatchAddOrUpdGroupConsumeCtrlInfo(req, false);
+ public StringBuilder adminBatchModGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupConsumeCtrlInfo(req, sBuffer, result, false);
}
/**
@@ -182,15 +190,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDelGroupConsumeCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDelGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -220,15 +222,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
}
private StringBuilder innAddOrUpdGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -292,15 +288,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
}
private StringBuilder innBatchAddOrUpdGroupConsumeCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 20cce00..00c6dcb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -65,9 +65,9 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryGroupResCtrlConf(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// build query entity
GroupResCtrlEntity entity = new GroupResCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
@@ -131,8 +131,10 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddGroupResCtrlConf(HttpServletRequest req) {
- return innAddOrUpdGroupResCtrlConf(req, true);
+ public StringBuilder adminAddGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupResCtrlConf(req, sBuffer, result, true);
}
/**
@@ -141,8 +143,10 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddGroupResCtrlConf(HttpServletRequest req) {
- return innBatchAddOrUpdGroupResCtrlConf(req, true);
+ public StringBuilder adminBatchAddGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupResCtrlConf(req, sBuffer, result, true);
}
/**
@@ -151,8 +155,10 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModGroupResCtrlConf(HttpServletRequest req) {
- return innAddOrUpdGroupResCtrlConf(req, false);
+ public StringBuilder adminModGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdGroupResCtrlConf(req, sBuffer, result, false);
}
/**
@@ -161,8 +167,10 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchUpdGroupResCtrlConf(HttpServletRequest req) {
- return innBatchAddOrUpdGroupResCtrlConf(req, false);
+ public StringBuilder adminBatchUpdGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdGroupResCtrlConf(req, sBuffer, result, false);
}
/**
@@ -171,15 +179,9 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDelGroupResCtrlConf(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDelGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -201,15 +203,9 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
}
private StringBuilder innAddOrUpdGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -277,15 +273,9 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
}
private StringBuilder innBatchAddOrUpdGroupResCtrlConf(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index aba5cd7..c8f28ba 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -94,14 +94,15 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req HttpServletRequest
* @return
*/
- public StringBuilder getGroupAddressStrInfo(HttpServletRequest req) {
- StringBuilder strBuffer = new StringBuilder(512);
+ public StringBuilder getGroupAddressStrInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
ClusterGroupVO clusterGroupVO = metaDataManager.getGroupAddressStrInfo();
if (clusterGroupVO == null) {
WebParameterUtils.buildFailResultWithBlankData(
- 500, "GetBrokerGroup info error", strBuffer);
+ 500, "GetBrokerGroup info error", sBuffer);
} else {
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"groupName\":\"")
+ sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"groupName\":\"")
.append(clusterGroupVO.getGroupName()).append("\",\"isPrimaryNodeActive\":")
.append(clusterGroupVO.isPrimaryNodeActive()).append(",\"data\":[");
int count = 0;
@@ -112,9 +113,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
continue;
}
if (count++ > 0) {
- strBuffer.append(",");
+ sBuffer.append(",");
}
- strBuffer.append("{\"index\":").append(count).append(",\"name\":\"").append(node.getNodeName())
+ sBuffer.append("{\"index\":").append(count)
+ .append(",\"name\":\"").append(node.getNodeName())
.append("\",\"hostName\":\"").append(node.getHostName())
.append("\",\"port\":\"").append(node.getPort())
.append("\",\"statusInfo\":{").append("\"nodeStatus\":\"")
@@ -122,10 +124,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
.append(node.getJoinTime()).append("\"}}");
}
}
- strBuffer.append("],\"count\":").append(count).append(",\"groupStatus\":\"")
+ sBuffer.append("],\"count\":").append(count).append(",\"groupStatus\":\"")
.append(clusterGroupVO.getGroupStatus()).append("\"}");
}
- return strBuffer;
+ return sBuffer;
}
/**
@@ -134,15 +136,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req HttpServletRequest
* @return
*/
- public StringBuilder transferCurrentMaster(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder transferCurrentMaster(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
try {
metaDataManager.transferMaster();
WebParameterUtils.buildSuccessResult(sBuffer,
@@ -159,8 +155,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryClusterDefSetting(HttpServletRequest req) {
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryClusterDefSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
return buildRetInfo(sBuffer, true);
}
@@ -170,8 +167,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryDefFlowCtrlRule(HttpServletRequest req) {
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryDefFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
return buildRetInfo(sBuffer, false);
}
@@ -181,8 +179,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminSetClusterDefSetting(HttpServletRequest req) {
- return innAddOrUpdDefFlowControlRule(req, true, true);
+ public StringBuilder adminSetClusterDefSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdDefFlowControlRule(req, sBuffer, result, true, true);
}
/**
@@ -191,8 +191,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminUpdClusterDefSetting(HttpServletRequest req) {
- return innAddOrUpdDefFlowControlRule(req, false, true);
+ public StringBuilder adminUpdClusterDefSetting(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdDefFlowControlRule(req, sBuffer, result, false, true);
}
/**
@@ -201,8 +203,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminSetDefFlowControlRule(HttpServletRequest req) {
- return innAddOrUpdDefFlowControlRule(req, true, false);
+ public StringBuilder adminSetDefFlowControlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdDefFlowControlRule(req, sBuffer, result, true, false);
}
/**
@@ -211,8 +215,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModDefFlowCtrlRule(HttpServletRequest req) {
- return innAddOrUpdDefFlowControlRule(req, false, false);
+ public StringBuilder adminModDefFlowCtrlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdDefFlowControlRule(req, sBuffer, result, false, false);
}
@@ -222,9 +228,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryClusterTopicView(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryClusterTopicView(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
@@ -319,15 +325,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDelDefFlowControlRule(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDelDefFlowControlRule(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -356,15 +356,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innAddOrUpdDefFlowControlRule(HttpServletRequest req,
- boolean isAddOp, boolean isNewVer) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp,
+ boolean isNewVer) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 74e1a6f..a2333e5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -67,9 +67,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder getSubscribeInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(1024);
+ public StringBuilder getSubscribeInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// get group list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
@@ -127,9 +127,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
* @return output as JSON
*/
// #lizard forgives
- public StringBuilder getConsumeGroupDetailInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(1024);
+ public StringBuilder getConsumeGroupDetailInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// get group name
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.GROUPNAME, true, null, sBuffer, result)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index fd1318f..83052e3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -73,9 +73,9 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryTopicCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
TopicCtrlEntity qryEntity = new TopicCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
@@ -114,8 +114,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddTopicCtrlInfo(HttpServletRequest req) {
- return innAddOrUpdTopicCtrlInfo(req, true);
+ public StringBuilder adminAddTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdTopicCtrlInfo(req, sBuffer, result, true);
}
@@ -125,8 +127,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddTopicCtrlInfo(HttpServletRequest req) {
- return innBatchAddOrUpdTopicCtrlInfo(req, true);
+ public StringBuilder adminBatchAddTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdTopicCtrlInfo(req, sBuffer, result, true);
}
/**
@@ -135,8 +139,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModTopicCtrlInfo(HttpServletRequest req) {
- return innAddOrUpdTopicCtrlInfo(req, false);
+ public StringBuilder adminModTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdTopicCtrlInfo(req, sBuffer, result, false);
}
/**
@@ -145,8 +151,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchModTopicCtrlInfo(HttpServletRequest req) {
- return innBatchAddOrUpdTopicCtrlInfo(req, false);
+ public StringBuilder adminBatchModTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdTopicCtrlInfo(req, sBuffer, result, false);
}
/**
@@ -155,15 +163,9 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDeleteTopicCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ public StringBuilder adminDeleteTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -187,15 +189,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
return buildRetInfo(retInfo, sBuffer);
}
- private StringBuilder innAddOrUpdTopicCtrlInfo(HttpServletRequest req, boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ private StringBuilder innAddOrUpdTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -249,15 +246,10 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
return buildRetInfo(retInfo, sBuffer);
}
- private StringBuilder innBatchAddOrUpdTopicCtrlInfo(HttpServletRequest req, boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ private StringBuilder innBatchAddOrUpdTopicCtrlInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 642b568..aba06c8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -117,8 +117,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminNewQueryTopicCfgAndRunInfo(HttpServletRequest req) {
- return innQueryTopicConfAndRunInfo(req, true);
+ public StringBuilder adminNewQueryTopicCfgAndRunInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innQueryTopicConfAndRunInfo(req, sBuffer, result, true);
}
/**
@@ -127,8 +129,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminOldQueryTopicCfgAndRunInfo(HttpServletRequest req) {
- return innQueryTopicConfAndRunInfo(req, false);
+ public StringBuilder adminOldQueryTopicCfgAndRunInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innQueryTopicConfAndRunInfo(req, sBuffer, result, false);
}
/**
@@ -137,8 +141,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminAddTopicDeployInfo(HttpServletRequest req) {
- return innAddOrUpdTopicDeployInfo(req, true);
+ public StringBuilder adminAddTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdTopicDeployInfo(req, sBuffer, result, true);
}
/**
@@ -147,8 +153,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminModifyTopicDeployInfo(HttpServletRequest req) {
- return innAddOrUpdTopicDeployInfo(req, false);
+ public StringBuilder adminModifyTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innAddOrUpdTopicDeployInfo(req, sBuffer, result, false);
}
/**
@@ -157,8 +165,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchAddTopicDeployInfo(HttpServletRequest req) {
- return innBatchAddOrUpdTopicDeployInfo(req, true);
+ public StringBuilder adminBatchAddTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdTopicDeployInfo(req, sBuffer, result, true);
}
/**
@@ -167,8 +177,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminBatchUpdTopicDeployInfo(HttpServletRequest req) {
- return innBatchAddOrUpdTopicDeployInfo(req, false);
+ public StringBuilder adminBatchUpdTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innBatchAddOrUpdTopicDeployInfo(req, sBuffer, result, false);
}
/**
@@ -177,8 +189,11 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminDelTopicDeployInfo(HttpServletRequest req) {
- return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_SOFT_DELETE);
+ public StringBuilder adminDelTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innModifyTopicDeployStatusInfo(req,
+ sBuffer, result, TopicStsChgType.STATUS_CHANGE_SOFT_DELETE);
}
/**
@@ -187,8 +202,11 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminRmvTopicDeployInfo(HttpServletRequest req) {
- return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_REMOVE);
+ public StringBuilder adminRmvTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innModifyTopicDeployStatusInfo(req,
+ sBuffer, result, TopicStsChgType.STATUS_CHANGE_REMOVE);
}
/**
@@ -197,8 +215,11 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminRedoDeletedTopicDeployInfo(HttpServletRequest req) {
- return innModifyTopicDeployStatusInfo(req, TopicStsChgType.STATUS_CHANGE_REDO_SFDEL);
+ public StringBuilder adminRedoDeletedTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ return innModifyTopicDeployStatusInfo(req,
+ sBuffer, result, TopicStsChgType.STATUS_CHANGE_REDO_SFDEL);
}
/**
@@ -207,9 +228,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQueryBrokerTopicCfgAndRunInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQueryBrokerTopicCfgAndRunInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -301,9 +322,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -340,9 +361,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -395,9 +416,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @param req
* @return
*/
- private StringBuilder innQueryTopicConfAndRunInfo(HttpServletRequest req, boolean isNewVer) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
+ private StringBuilder innQueryTopicConfAndRunInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isNewVer) {
TopicDeployEntity qryEntity = new TopicDeployEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
@@ -698,15 +720,10 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
return sBuffer;
}
- private StringBuilder innAddOrUpdTopicDeployInfo(HttpServletRequest req, boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
+ private StringBuilder innAddOrUpdTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
+ boolean isAddOp) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -751,15 +768,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
private StringBuilder innBatchAddOrUpdTopicDeployInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
boolean isAddOp) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -899,15 +910,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
* @return
*/
private StringBuilder innModifyTopicDeployStatusInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result,
TopicStsChgType chgType) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);