You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/04 02:16:22 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-430]Optimizing
the implementation of HTTP API for broker (#338)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 2e9e814 [TUBEMQ-430]Optimizing the implementation of HTTP API for broker (#338)
2e9e814 is described below
commit 2e9e814ec82f915bc62e7edd27aa989bdb71920b
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Dec 4 10:16:15 2020 +0800
[TUBEMQ-430]Optimizing the implementation of HTTP API for broker (#338)
---
.../tubemq/client/config/ConsumerConfig.java | 12 +
.../client/consumer/PullMessageConsumer.java | 7 +
.../org/apache/tubemq/corebase/utils/RegexDef.java | 60 ++++
.../server/broker/web/AbstractWebHandler.java | 83 +++++
.../server/broker/web/BrokerAdminServlet.java | 333 ++++++++++-----------
.../tubemq/server/common/fielddef/CliArgDef.java | 111 +++++++
.../tubemq/server/common/fielddef/WebFieldDef.java | 159 ++++++++++
.../tubemq/server/common/utils/ProcessResult.java | 53 ++++
.../server/common/utils/WebParameterUtils.java | 320 +++++++++++++++++++-
.../tubemq/server/common/webbase/WebFieldType.java | 60 ++++
.../webbase/WebMethodMapper.java} | 12 +-
.../server/master/web/action/screen/Webapi.java | 6 +-
.../master/web/handler/AbstractWebHandler.java | 2 +-
13 files changed, 1037 insertions(+), 181 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index d8b63fb..184da79 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -124,6 +124,18 @@ public class ConsumerConfig extends TubeClientConfig {
return pullConsumeReadyWaitPeriodMs;
}
+ // setPullConsumeReadyWaitPeriodMs() use note:
+ // The value range is [negative value, 0, positive value] and the value directly determines
+ // the behavior of the PullMessageConsumer.GetMessage() function:
+ // 1. if it is set to a negative value, it means that the GetMessage() calling thread will
+ // be blocked forever and will not return until the consumption conditions are met;
+ // 2. if If it is set to 0, it means that the GetMessage() calling thread will only block
+ // the ConsumerConfig.getPullConsumeReadyChkSliceMs() interval when the consumption
+ // conditions are not met and then return;
+ // 3. if it is set to a positive number, it will not meet the current user usage (including
+ // unused partitions or allocated partitions, but these partitions do not meet the usage
+ // conditions), the GetMessage() calling thread will be blocked until the total time of
+ // ConsumerConfig.getPullConsumeReadyWaitPeriodMs expires
public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) {
this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs;
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
index af5d50f..d9c3baf 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
@@ -28,6 +28,13 @@ public interface PullMessageConsumer extends MessageConsumer {
PullMessageConsumer subscribe(String topic,
TreeSet<String> filterConds) throws TubeClientException;
+ // getMessage() use note:
+ // This getMessage have a blocking situation: when the current
+ // consumer consumption situation is not satisfied (including
+ // without partitions to consumption, or allocated partitions but
+ // the partitions do not meet the consumption situation),
+ // the call will sleep at intervals of ConsumerConfig.getPullConsumeReadyChkSliceMs(),
+ // until the total time of ConsumerConfig.getPullConsumeReadyWaitPeriodMs
ConsumerResult getMessage() throws TubeClientException;
ConsumerResult confirmConsume(final String confirmContext,
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java
new file mode 100644
index 0000000..842c6a9
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+
+
+public enum RegexDef {
+
+ TMP_FILTER(0, "^[_A-Za-z0-9]+$",
+ "must only contain characters,numbers,and underscores"),
+ TMP_STRING(1, "^[a-zA-Z]\\w+$",
+ "must begin with a letter,can only contain characters,numbers,and underscores"),
+ TMP_NUMBER(2, "^-?[0-9]\\d*$", "must only contain numbers"),
+ TMP_GROUP(3, "^[a-zA-Z][\\w-]+$",
+ "must begin with a letter,can only contain characters,numbers,hyphen,and underscores"),
+ TMP_CONSUMERID(4, "^[_A-Za-z0-9\\.\\-]+$",
+ "must begin with a letter,can only contain characters,numbers,dot,scores,and underscores"),
+ TMP_IPV4ADDRESS(5,
+ "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))",
+ "must matches the IP V4 address regulation");
+
+
+ private final int id;
+ private final String pattern;
+ private final String errMsgTemp;
+
+
+ RegexDef(int id, String pattern, String errMsgTemp) {
+ this.id = id;
+ this.pattern = pattern;
+ this.errMsgTemp = errMsgTemp;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getPattern() {
+ return pattern;
+ }
+
+ public String getErrMsgTemp() {
+ return errMsgTemp;
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
new file mode 100644
index 0000000..b44d88c
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.web;
+
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
+import java.io.IOException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.tubemq.server.broker.TubeBroker;
+import org.apache.tubemq.server.common.webbase.WebMethodMapper.WebApiRegInfo;
+
+
+
+public abstract class AbstractWebHandler extends HttpServlet {
+
+ protected final TubeBroker broker;
+
+ public AbstractWebHandler(TubeBroker broker) {
+ this.broker = broker;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req,
+ HttpServletResponse resp) throws IOException {
+ doPost(req, resp);
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req,
+ HttpServletResponse resp) throws IOException {
+ StringBuilder strBuffer = new StringBuilder(1024);
+
+ try {
+ String method = req.getParameter("method");
+ if (method == null) {
+ strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append("Please take with method parameter! \"}");
+ } else {
+ WebApiRegInfo webApiRegInfo = getWebApiRegInfo(true, method);
+ if (webApiRegInfo == null) {
+ strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append("Unsupported method ").append(method).append("}");
+ } else {
+ strBuffer = (StringBuilder) webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req);
+ }
+ }
+ } catch (Throwable e) {
+ strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append("Bad request from server: ")
+ .append(e.getMessage())
+ .append("\"}");
+ }
+ resp.getWriter().write(strBuffer.toString());
+ resp.setCharacterEncoding(req.getCharacterEncoding());
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.flushBuffer();
+ }
+
+ public abstract void registerWebApiMethod();
+
+ protected void innRegisterWebMethod(String webMethodName,
+ String clsMethodName) {
+ registerWebMethod(true, webMethodName, clsMethodName, this);
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index e91ae79..ab43c2e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,18 +17,12 @@
package org.apache.tubemq.server.broker.web;
-import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.TubeBroker;
@@ -36,83 +30,44 @@ import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.offset.OffsetService;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
/***
* Broker's web servlet. Used for admin operation, like query consumer's status etc.
*/
-public class BrokerAdminServlet extends HttpServlet {
- private final TubeBroker broker;
+public class BrokerAdminServlet extends AbstractWebHandler {
- public BrokerAdminServlet(TubeBroker broker) {
- this.broker = broker;
- }
- @Override
- protected void doGet(HttpServletRequest req,
- HttpServletResponse resp) throws ServletException, IOException {
- doPost(req, resp);
+ public BrokerAdminServlet(TubeBroker broker) {
+ super(broker);
+ registerWebApiMethod();
}
@Override
- protected void doPost(HttpServletRequest req,
- HttpServletResponse resp) throws ServletException, IOException {
- StringBuilder sBuilder = new StringBuilder(1024);
- try {
- String method = req.getParameter("method");
- if ("admin_manual_set_current_offset".equals(method)) {
- // manual set offset
- sBuilder = this.adminManualSetCurrentOffSet(req);
- } else if ("admin_query_group_offset".equals(method)) {
- // query consumer group's offset
- sBuilder = this.adminQueryCurrentGroupOffSet(req);
- } else if ("admin_snapshot_message".equals(method)) {
- // query snapshot message
- sBuilder = this.adminQuerySnapshotMessageSet(req);
- } else if ("admin_query_broker_all_consumer_info".equals(method)) {
- // query broker's all consumer info
- sBuilder = this.adminQueryBrokerAllConsumerInfo(req);
- } else if ("admin_query_broker_memstore_info".equals(method)) {
- // get memory store status info
- sBuilder = this.adminGetMemStoreStatisInfo(req);
- } else if ("admin_query_broker_all_store_info".equals(method)) {
- // query broker's all message store info
- sBuilder = this.adminQueryBrokerAllMessageStoreInfo(req);
- } else if ("admin_query_consumer_regmap".equals(method)) {
- Map<String, ConsumerNodeInfo> map =
- broker.getBrokerServiceServer().getConsumerRegisterMap();
- int totalCnt = 0;
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
- .append(",\"dataSet\":[");
- for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
- if (entry.getKey() == null || entry.getValue() == null) {
- continue;
- }
- if (totalCnt > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"Partition\":\"").append(entry.getKey())
- .append("\",\"Consumer\":\"")
- .append(entry.getValue().getConsumerId())
- .append("\",\"index\":").append(++totalCnt).append("}");
- }
- sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
- } else {
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append("Invalid request: Unsupported method!")
- .append("\"}");
- }
-
- } catch (Exception e) {
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append("Bad request from server: ")
- .append(e.getMessage())
- .append("\"}");
- }
- resp.getWriter().write(sBuilder.toString());
- resp.setCharacterEncoding(req.getCharacterEncoding());
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.flushBuffer();
+ public void registerWebApiMethod() {
+ // query consumer group's offset
+ innRegisterWebMethod("admin_query_group_offset",
+ "adminQueryCurrentGroupOffSet");
+ // query snapshot message
+ innRegisterWebMethod("admin_snapshot_message",
+ "adminQuerySnapshotMessageSet");
+ // query broker's all consumer info
+ innRegisterWebMethod("admin_query_broker_all_consumer_info",
+ "adminQueryBrokerAllConsumerInfo");
+ // get memory store status info
+ innRegisterWebMethod("admin_query_broker_memstore_info",
+ "adminGetMemStoreStatisInfo");
+ // query broker's all message store info
+ innRegisterWebMethod("admin_query_broker_all_store_info",
+ "adminQueryBrokerAllMessageStoreInfo");
+ // query consumer register info
+ innRegisterWebMethod("admin_query_consumer_regmap",
+ "adminQueryConsumerRegisterInfo");
+ // manual set offset
+ innRegisterWebMethod("admin_manual_set_current_offset",
+ "adminManualSetCurrentOffSet");
}
/***
@@ -122,13 +77,16 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception {
+ public StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception {
int index = 0;
StringBuilder sBuilder = new StringBuilder(1024);
- String groupNameInput =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH, false, null);
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
Map<String, ConsumerNodeInfo> map =
broker.getBrokerServiceServer().getConsumerRegisterMap();
@@ -139,7 +97,7 @@ public class BrokerAdminServlet extends HttpServlet {
String[] partitionIdArr =
entry.getKey().split(TokenConstants.ATTR_SEP);
String groupName = partitionIdArr[0];
- if (!TStringUtils.isBlank(groupNameInput) && (!groupNameInput.equals(groupName))) {
+ if (!groupNameSet.isEmpty() && !groupNameSet.contains(groupName)) {
continue;
}
String topicName = partitionIdArr[1];
@@ -209,22 +167,23 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req)
+ public StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req)
throws Exception {
StringBuilder sBuilder = new StringBuilder(1024);
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
- String topicNameInput =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, false, null);
Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
broker.getStoreManager().getMessageStores();
int index = 0;
int recordId = 0;
for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) {
- if (TStringUtils.isBlank(entry.getKey()) ||
- (TStringUtils.isNotBlank(topicNameInput)
- && !topicNameInput.equals(entry.getKey()))) {
+ if (TStringUtils.isBlank(entry.getKey())
+ || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) {
continue;
}
if (recordId > 0) {
@@ -276,47 +235,27 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
+ public StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
StringBuilder sBuilder = new StringBuilder(1024);
- Set<String> batchTopicNames = new HashSet<>();
- String inputTopicName = req.getParameter("topicName");
- if (TStringUtils.isNotBlank(inputTopicName)) {
- inputTopicName = inputTopicName.trim();
- String[] strTopicNames =
- inputTopicName.split(TokenConstants.ARRAY_SEP);
- for (int i = 0; i < strTopicNames.length; i++) {
- if (TStringUtils.isBlank(strTopicNames[i])) {
- continue;
- }
- String topicName = strTopicNames[i].trim();
- if (topicName.length() > TBaseConstants.META_MAX_TOPICNAME_LENGTH) {
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append("Invalid parameter: the max length of ")
- .append(topicName).append(" in topicName parameter over ")
- .append(TBaseConstants.META_MAX_TOPICNAME_LENGTH)
- .append(" characters\"}");
- return sBuilder;
- }
- if (!topicName.matches(TBaseConstants.META_TMP_STRING_VALUE)) {
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append("Invalid parameter: the value of ").append(topicName)
- .append(" in topicName parameter must begin with a letter,")
- .append(" can only contain characters,numbers,and underscores!\"}");
- return sBuilder;
- }
- batchTopicNames.add(topicName);
- }
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ result = WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.NEEDREFRESH, false, false);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
}
- boolean requireRefresh =
- WebParameterUtils.validBooleanDataParameter("needRefresh",
- req.getParameter("needRefresh"), false, false);
+ boolean requireRefresh = (boolean) result.retData1;
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"detail\":[");
Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
broker.getStoreManager().getMessageStores();
int recordId = 0, index = 0;
for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) {
if (TStringUtils.isBlank(entry.getKey())
- || (!batchTopicNames.isEmpty() && !batchTopicNames.contains(entry.getKey()))) {
+ || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) {
continue;
}
String topicName = entry.getKey();
@@ -354,25 +293,38 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception {
+ public StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception {
StringBuilder sBuilder = new StringBuilder(512);
- final String topicName =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
- final String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
- final String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"), 64, true, "");
- int partitionId =
- WebParameterUtils.validIntDataParameter("partitionId",
- req.getParameter("partitionId"), true, -1, 0);
- long manualOffset =
- WebParameterUtils.validLongDataParameter("manualOffset",
- req.getParameter("manualOffset"), true, -1);
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String topicName = (String) result.retData1;
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String groupName = (String) result.retData1;
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String modifyUser = (String) result.retData1;
+ result = WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ int partitionId = (Integer) result.retData1;
+ result = WebParameterUtils.getLongParamValue(req,
+ WebFieldDef.MANUALOFFSET, true, -1);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ long manualOffset = (Long) result.retData1;
List<String> topicList = broker.getMetadataManager().getTopics();
if (!topicList.contains(topicName)) {
sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -430,28 +382,39 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception {
+ public StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception {
StringBuilder sBuilder = new StringBuilder(1024);
- final String topicName =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
- final int partitionId =
- WebParameterUtils.validIntDataParameter("partitionId",
- req.getParameter("partitionId"), false, -1, 0);
- int msgCount =
- WebParameterUtils.validIntDataParameter("msgCount",
- req.getParameter("msgCount"), false, 3, 3);
- msgCount = msgCount < 1 ? 1 : msgCount;
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String topicName = (String) result.retData1;
+ result = WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ int partitionId = (Integer) result.retData1;
+ result = WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MSGCOUNT, false, 3, 3);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ int msgCount = (Integer) result.retData1;
+ msgCount = Math.max(msgCount, 1);
if (msgCount > 50) {
sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Over max allowed msgCount value, allowed count is 50!")
.append("\"}");
return sBuilder;
}
- Set<String> filterCondStrSet =
- WebParameterUtils.checkAndGetFilterCondSet(req.getParameter("filterConds"),
- false, true, sBuilder);
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.FILTERCONDS, false, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ Set<String> filterCondStrSet = (Set<String>) result.retData1;
sBuilder = broker.getBrokerServiceServer()
.getMessageSnapshot(topicName, partitionId, msgCount, filterCondStrSet, sBuilder);
return sBuilder;
@@ -464,20 +427,34 @@ public class BrokerAdminServlet extends HttpServlet {
* @return
* @throws Exception
*/
- private StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req)
+ public StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req)
throws Exception {
StringBuilder sBuilder = new StringBuilder(1024);
- String topicName =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
- int partitionId =
- WebParameterUtils.validIntDataParameter("partitionId",
- req.getParameter("partitionId"), true, -1, 0);
+ ProcessResult result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.TOPICNAME, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String topicName = (String) result.retData1;
+ result = WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ final String groupName = (String) result.retData1;
+ result = WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.PARTITIONID, true, -1, 0);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ int partitionId = (Integer) result.retData1;
+
+ result = WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.REQUIREREALOFFSET, false, false);
+ if (!result.success) {
+ return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ }
+ boolean requireRealOffset = (Boolean) result.retData1;
List<String> topicList = broker.getMetadataManager().getTopics();
if (!topicList.contains(topicName)) {
sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -499,9 +476,6 @@ public class BrokerAdminServlet extends HttpServlet {
.append("\"}");
return sBuilder;
}
- boolean requireRealOffset =
- WebParameterUtils.validBooleanDataParameter("requireRealOffset",
- req.getParameter("requireRealOffset"), false, false);
long tmpOffset = offsetService.getTmpOffset(groupName, topicName, partitionId);
long minDataOffset = store.getDataMinOffset();
long maxDataOffset = store.getDataMaxOffset();
@@ -538,5 +512,28 @@ public class BrokerAdminServlet extends HttpServlet {
return sBuilder;
}
+ public StringBuilder adminQueryConsumerRegisterInfo(HttpServletRequest req) {
+ StringBuilder sBuilder = new StringBuilder(1024);
+ Map<String, ConsumerNodeInfo> map =
+ broker.getBrokerServiceServer().getConsumerRegisterMap();
+ int totalCnt = 0;
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
+ .append(",\"dataSet\":[");
+ for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ if (totalCnt > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"Partition\":\"").append(entry.getKey())
+ .append("\",\"Consumer\":\"")
+ .append(entry.getValue().getConsumerId())
+ .append("\",\"index\":").append(++totalCnt).append("}");
+ }
+ sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
+ return sBuilder;
+ }
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
new file mode 100644
index 0000000..b2d9327
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.fielddef;
+
+
+
+public enum CliArgDef {
+
+ // Note: Due to compatibility considerations,
+ // the defined fields in the scheme are forbidden to be modified,
+ // only new fields can be added
+
+ HELP("h", "help", "Print usage information."),
+ VERSION("v", "version", "Display TubeMQ version."),
+ MASTERSERVER("master-servers", "master-servers",
+ "String: format is master1_ip:port[,master2_ip:port]",
+ "The master address(es) to connect to."),
+ MASTERURL("master-url", "master-url",
+ "String: format is http://master_ip:master_webport/",
+ "Master Service URL to which to connect.(default: http://localhost:8080/)"),
+ BROKERURL("broker-url", "broker-url",
+ "String: format is http://broker_ip:broker_webport/",
+ "Broker Service URL to which to connect.(default: http://localhost:8081/)"),
+ MESSAGES("messages", "messages",
+ "Long: count",
+ "The number of messages to send or consume, If not set, production or consumption is continual."),
+ MSGDATASIZE("msg-data-size", "message-data-size",
+ "Int: message size",
+ "message's data size in bytes. Note that you must provide exactly"
+ + " one of --msg-data-size or --payload-file."),
+ PAYLOADFILE("payload-file", "payload-file",
+ "String: payload file path",
+ "file to read the message payloads from. This works only for"
+ + " UTF-8 encoded text files. Payloads will be read from this"
+ + " file and a payload will be randomly selected when sending"
+ + " messages. Note that you must provide exactly one"
+ + " of --msg-data-size or --payload-file."),
+ PAYLOADDELIM("payload-delimiter", "payload-delimiter",
+ "String: payload data's delimiter",
+ "provides delimiter to be used when --payload-file is provided."
+ + " Defaults to new line. Note that this parameter will be"
+ + " ignored if --payload-file is not provided. (default: \\n)"),
+ PRDTOPIC("topic", "topicName",
+ "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]",
+ "The topic(s) to produce messages to."),
+ CNSTOPIC("topic", "topicName",
+ "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]",
+ "The topic(s) to consume on."),
+ RPCTIMEOUT("timeout", "timeout",
+ "Long: milliseconds",
+ "The maximum duration between request and response in milliseconds. (default: 10000)"),
+ GROUP("group", "groupName",
+ "String: consumer group",
+ "The consumer group name of the consumer."),
+ CLIENTCOUNT("client-num", "client-num",
+ "Int: client count",
+ "Number of consumers to started."),
+ PULLMODEL("pull-model", "pull-model",
+ "Pull consumption model."),
+ PUSHMODEL("push-model", "push-model",
+ "Push consumption model."),
+ FETCHTHREADS("num-fetch-threads", "num-fetch-threads",
+ "Integer: count",
+ "Number of fetch threads, default: num of cpu count."),
+ FROMLATEST("from-latest", "from-latest",
+ "Start to consume from the latest message present in the log."),
+ FROMBEGINNING("from-beginning", "from-beginning",
+ "If the consumer does not already have an established offset to consume from,"
+ + " start with the earliest message present in the log rather than the latest message."),
+ OUTPUTINTERVAL("output-interval", "output-interval",
+ "Integer: interval_ms",
+ "Interval in milliseconds at which to print progress info. (default: 5000)");
+
+
+ CliArgDef(String opt, String longOpt, String optDesc) {
+ this(opt, longOpt, false, "", optDesc);
+ }
+
+ CliArgDef(String opt, String longOpt, String argDesc, String optDesc) {
+ this(opt, longOpt, true, argDesc, optDesc);
+ }
+
+ CliArgDef(String opt, String longOpt, boolean hasArg, String argDesc, String optDesc) {
+ this.opt = opt;
+ this.longOpt = longOpt;
+ this.hasArg = hasArg;
+ this.argDesc = argDesc;
+ this.optDesc = optDesc;
+ }
+
+ public final String opt;
+ public final String longOpt;
+ public final boolean hasArg;
+ public final String argDesc;
+ public final String optDesc;
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
new file mode 100644
index 0000000..1025ba0
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.fielddef;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.RegexDef;
+import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.webbase.WebFieldType;
+
+
+public enum WebFieldDef {
+
+ // Note: Due to compatibility considerations,
+ // the defined fields in the scheme are forbidden to be modified,
+ // only new fields can be added
+
+ TOPICNAME(0, "topicName", "topic", WebFieldType.STRING,
+ "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH,
+ RegexDef.TMP_STRING),
+ GROUPNAME(1, "groupName", "group", WebFieldType.STRING,
+ "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+ RegexDef.TMP_GROUP),
+ PARTITIONID(2, "partitionId", "pid", WebFieldType.INT,
+ "Partition id", RegexDef.TMP_NUMBER),
+ CREATEUSER(3, "createUser", "cur", WebFieldType.STRING,
+ "Record creator", TBaseConstants.META_MAX_USERNAME_LENGTH,
+ RegexDef.TMP_STRING),
+ MODIFYUSER(4, "modifyUser", "mur", WebFieldType.STRING,
+ "Record modifier", TBaseConstants.META_MAX_USERNAME_LENGTH,
+ RegexDef.TMP_STRING),
+ MANUALOFFSET(5, "manualOffset", "offset", WebFieldType.LONG,
+ "Reset offset value", RegexDef.TMP_NUMBER),
+ MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT,
+ "Number of returned messages", RegexDef.TMP_NUMBER),
+ FILTERCONDS(7, "filterConds", "flts", WebFieldType.COMPSTRING,
+ "Filter condition items", TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH,
+ TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT, RegexDef.TMP_FILTER),
+ REQUIREREALOFFSET(8, "requireRealOffset", "dko", WebFieldType.BOOLEAN,
+ "Require return disk offset details"),
+ NEEDREFRESH(9, "needRefresh", "nrf", WebFieldType.BOOLEAN,
+ "Require refresh data"),
+ COMPSGROUPNAME(10, "groupName", "group", WebFieldType.COMPSTRING,
+ "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+ RegexDef.TMP_GROUP),
+ COMPSTOPICNAME(11, "topicName", "topic", WebFieldType.COMPSTRING,
+ "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH,
+ RegexDef.TMP_STRING),
+ COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT,
+ "Partition id", RegexDef.TMP_NUMBER);
+
+
+
+
+ public final int id;
+ public final String name;
+ public final String shortName;
+ public final WebFieldType type;
+ public final String desc;
+ public final boolean compVal;
+ public final String splitToken;
+ public final int itemMaxCnt;
+ public final int valMaxLen;
+ public final boolean regexCheck;
+ public final RegexDef regexDef;
+
+
+ WebFieldDef(int id, String name, String shortName, WebFieldType type, String desc) {
+ this(id, name, shortName, type, desc, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, false, null);
+ }
+
+ WebFieldDef(int id, String name, String shortName, WebFieldType type,
+ String desc, RegexDef regexDef) {
+ this(id, name, shortName, type, desc,
+ TBaseConstants.META_VALUE_UNDEFINED, regexDef);
+ }
+
+ WebFieldDef(int id, String name, String shortName, WebFieldType type,
+ String desc, int valMaxLen, RegexDef regexDef) {
+ this(id, name, shortName, type, desc, valMaxLen,
+ TServerConstants.CFG_BATCH_RECORD_OPERATE_MAX_COUNT,
+ true, regexDef);
+ }
+
+ WebFieldDef(int id, String name, String shortName, WebFieldType type,
+ String desc, int valMaxLen, int itemMaxCnt, RegexDef regexDef) {
+ this(id, name, shortName, type, desc, valMaxLen,
+ itemMaxCnt, true, regexDef);
+ }
+
+ WebFieldDef(int id, String name, String shortName, WebFieldType type,
+ String desc, int valMaxLen, int itemMaxCnt,
+ boolean regexChk, RegexDef regexDef) {
+ this.id = id;
+ this.name = name;
+ this.shortName = shortName;
+ this.type = type;
+ this.desc = desc;
+ if (isCompFieldType()) {
+ this.compVal = true;
+ this.splitToken = TokenConstants.ARRAY_SEP;
+ this.itemMaxCnt = itemMaxCnt;
+ } else {
+ this.compVal = false;
+ this.splitToken = "";
+ this.itemMaxCnt = TBaseConstants.META_VALUE_UNDEFINED;
+ }
+ this.valMaxLen = valMaxLen;
+ this.regexCheck = regexChk;
+ this.regexDef = regexDef;
+ }
+
+ public boolean isCompFieldType() {
+ return (this.type == WebFieldType.COMPINT
+ || this.type == WebFieldType.COMPSTRING);
+ }
+
+ private static final WebFieldDef[] WEB_FIELD_DEFS;
+ private static final int MIN_FIELD_ID = 0;
+ public static final int MAX_FIELD_ID;
+
+ static {
+ int maxId = -1;
+ for (WebFieldDef fieldDef : WebFieldDef.values()) {
+ maxId = Math.max(maxId, fieldDef.id);
+ }
+ WebFieldDef[] idToType = new WebFieldDef[maxId + 1];
+ for (WebFieldDef fieldDef : WebFieldDef.values()) {
+ idToType[fieldDef.id] = fieldDef;
+ }
+ WEB_FIELD_DEFS = idToType;
+ MAX_FIELD_ID = maxId;
+ }
+
+ public static WebFieldDef valueOf(int fieldId) {
+ if (fieldId >= MIN_FIELD_ID && fieldId <= MAX_FIELD_ID) {
+ return WEB_FIELD_DEFS[fieldId];
+ }
+ throw new IllegalArgumentException(
+ String.format("Unexpected WebFieldDef id `%s`, it should be between `%s` " +
+ "and `%s` (inclusive)", fieldId, MIN_FIELD_ID, MAX_FIELD_ID));
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
new file mode 100644
index 0000000..3688b1c
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.utils;
+
+import org.apache.tubemq.corebase.TErrCodeConstants;
+
+public class ProcessResult {
+ public boolean success = true;
+ public int errCode = TErrCodeConstants.SUCCESS;
+ public String errInfo = "";
+ public Object retData1 = null;
+
+ public ProcessResult() {
+
+ }
+
+ public ProcessResult(Object retData) {
+ this.success = true;
+ this.retData1 = retData;
+ }
+
+ public ProcessResult(int errCode, String errInfo) {
+ this.success = false;
+ this.errCode = errCode;
+ this.errInfo = errInfo;
+ }
+
+ public void setFailResult(int errCode, final String errMsg) {
+ this.success = false;
+ this.errCode = errCode;
+ this.errInfo = errMsg;
+ }
+
+ public void setSuccResult(Object retData) {
+ this.success = true;
+ this.retData1 = retData;
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index f4ce5bf..b7d5d41 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -30,18 +30,21 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+
public class WebParameterUtils {
private static final List<String> allowedDelUnits = Arrays.asList("s", "m", "h");
@@ -153,6 +156,24 @@ public class WebParameterUtils {
/**
* Parse the parameter value from an object value to string value
*
+ * @param req http servlet request
+ * @param paramName the parameter name
+ * @param paramMaxLen the max length of string to return
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defaultValue a default value returned if failed to parse value from the given object
+ * @return a string value of parameter
+ * @throws Exception if failed to parse the object
+ */
+ public static String validStringParameter(HttpServletRequest req, String paramName,
+ int paramMaxLen, boolean required,
+ String defaultValue) throws Exception {
+ return validStringParameter(paramName,
+ req.getParameter(paramName), paramMaxLen, required, defaultValue);
+ }
+
+ /**
+ * Parse the parameter value from an object value to string value
+ *
* @param paramName the parameter name
* @param paramValue the parameter value which is an object for parsing
* @param paramMaxLen the max length of string to return
@@ -161,9 +182,11 @@ public class WebParameterUtils {
* @return a string value of parameter
* @throws Exception if failed to parse the object
*/
- public static String validStringParameter(String paramName, String paramValue, int paramMaxLen,
- boolean required, String defaultValue) throws Exception {
- String tmpParamValue = checkParamCommonRequires(paramName, paramValue, required);
+ public static String validStringParameter(String paramName, String paramValue,
+ int paramMaxLen, boolean required,
+ String defaultValue) throws Exception {
+ String tmpParamValue =
+ checkParamCommonRequires(paramName, paramValue, required);
if (TStringUtils.isBlank(tmpParamValue)) {
return defaultValue;
}
@@ -214,6 +237,297 @@ public class WebParameterUtils {
return tmpParamValue;
}
+ public static StringBuilder buildFailResult(StringBuilder strBuffer, String errMsg) {
+ return strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append(errMsg).append("\"}");
+ }
+
+ /**
+ * Parse the parameter value from an object value to a long value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue a default value returned if failed to parse value from the given object
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getLongParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ long defValue) {
+ ProcessResult procResult =
+ getStringParamValue(req, fieldDef, required, null);
+ if (!procResult.success) {
+ return procResult;
+ }
+ String paramValue = (String) procResult.retData1;
+ if (paramValue == null) {
+ procResult.setSuccResult(defValue);
+ return procResult;
+ }
+ try {
+ long paramIntVal = Long.parseLong(paramValue);
+ procResult.setSuccResult(paramIntVal);
+ } catch (Throwable e) {
+ procResult.setFailResult(400,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name).append(" parse error: ")
+ .append(e.getMessage()).toString());
+ }
+ return procResult;
+ }
+
+ /**
+ * Parse the parameter value from an object value to a integer value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue a default value returned if failed to parse value from the given object
+ * @param minValue min value required
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getIntParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ int defValue,
+ int minValue) {
+ ProcessResult procResult =
+ getStringParamValue(req, fieldDef, required, null);
+ if (!procResult.success) {
+ return procResult;
+ }
+ if (fieldDef.isCompFieldType()) {
+ Set<Integer> tgtValueSet = new HashSet<Integer>();
+ Set<String> valItemSet = (Set<String>) procResult.retData1;
+ if (valItemSet.isEmpty()) {
+ tgtValueSet.add(defValue);
+ procResult.setSuccResult(tgtValueSet);
+ return procResult;
+ }
+ ProcessResult procRet = new ProcessResult();
+ for (String itemVal : valItemSet) {
+ if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) {
+ return procRet;
+ }
+ tgtValueSet.add((Integer) procRet.retData1);
+ }
+ procResult.setSuccResult(tgtValueSet);
+ } else {
+ String paramValue = (String) procResult.retData1;
+ if (paramValue == null) {
+ procResult.setSuccResult(defValue);
+ return procResult;
+ }
+ checkIntValueNorms(procResult, fieldDef, paramValue, minValue);
+ }
+ return procResult;
+ }
+
+ /**
+ * Parse the parameter value from an object value to a boolean value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue a default value returned if failed to parse value from the given object
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getBooleanParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ boolean defValue) {
+ ProcessResult procResult =
+ getStringParamValue(req, fieldDef, required, null);
+ if (!procResult.success) {
+ return procResult;
+ }
+ String paramValue = (String) procResult.retData1;
+ if (paramValue == null) {
+ procResult.setSuccResult(defValue);
+ return procResult;
+ }
+ procResult.setSuccResult(Boolean.parseBoolean(paramValue));
+ return procResult;
+ }
+
+ /**
+ * Parse the parameter value from an object value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue a default value returned if failed to parse value from the given object
+ * @return valid result for the parameter value
+ */
+ public static ProcessResult getStringParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ String defValue) {
+ ProcessResult procResult = new ProcessResult();
+ // get parameter value
+ String paramValue = req.getParameter(fieldDef.name);
+ if (paramValue == null) {
+ paramValue = req.getParameter(fieldDef.shortName);
+ }
+ if (TStringUtils.isNotBlank(paramValue)) {
+ // Cleanup value extra characters
+ paramValue = escDoubleQuotes(paramValue.trim());
+ }
+ // Check if the parameter exists
+ if (TStringUtils.isBlank(paramValue)) {
+ if (required) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append(" is missing or value is null or blank!").toString());
+ } else {
+ procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+ }
+ return procResult;
+ }
+ // check if value is norm;
+ if (fieldDef.isCompFieldType()) {
+ // split original value to items
+ Set<String> valItemSet = new HashSet<>();
+ String[] strParamValueItems = paramValue.split(fieldDef.splitToken);
+ for (String strParamValueItem : strParamValueItems) {
+ if (TStringUtils.isBlank(strParamValueItem)) {
+ continue;
+ }
+ if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) {
+ return procResult;
+ }
+ valItemSet.add((String) procResult.retData1);
+ }
+ // check if is empty result
+ if (valItemSet.isEmpty()) {
+ if (required) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append(" is missing or value is null or blank!").toString());
+ } else {
+ procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+ }
+ return procResult;
+ }
+ // check max item count
+ if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
+ if (valItemSet.size() > fieldDef.itemMaxCnt) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name)
+ .append("'s item count over max allowed count (")
+ .append(fieldDef.itemMaxCnt).append(")!").toString());
+ }
+ }
+ procResult.setSuccResult(valItemSet);
+ } else {
+ if (!checkStrValueNorms(procResult, fieldDef, paramValue)) {
+ return procResult;
+ }
+ procResult.setSuccResult(paramValue);
+ }
+ return procResult;
+ }
+
+ /**
+ * process string default value
+ *
+ * @param procResult process result
+ * @param isCompFieldType the parameter if compound field type
+ * @param defValue the parameter default value
+ * @return process result for default value of parameter
+ */
+ private static ProcessResult procStringDefValue(ProcessResult procResult,
+ boolean isCompFieldType,
+ String defValue) {
+ if (isCompFieldType) {
+ Set<String> valItemSet = new HashSet<>();
+ if (TStringUtils.isNotBlank(defValue)) {
+ valItemSet.add(defValue);
+ }
+ procResult.setSuccResult(valItemSet);
+ } else {
+ procResult.setSuccResult(defValue);
+ }
+ return procResult;
+ }
+
+ /**
+ * Parse the parameter string value by regex define
+ *
+ * @param procResult process result
+ * @param fieldDef the parameter field definition
+ * @param paramVal the parameter value
+ * @return check result for string value of parameter
+ */
+ private static boolean checkStrValueNorms(ProcessResult procResult,
+ WebFieldDef fieldDef,
+ String paramVal) {
+ paramVal = paramVal.trim();
+ if (TStringUtils.isBlank(paramVal)) {
+ procResult.setSuccResult(null);
+ return true;
+ }
+ // check value's max length
+ if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
+ if (paramVal.length() > fieldDef.valMaxLen) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("over max length for ")
+ .append(fieldDef.name).append(", only allow ")
+ .append(fieldDef.valMaxLen).append(" length").toString());
+ return false;
+ }
+ }
+ // check value's pattern
+ if (fieldDef.regexCheck) {
+ if (!paramVal.matches(fieldDef.regexDef.getPattern())) {
+ procResult.setFailResult(fieldDef.id,
+ new StringBuilder(512).append("illegal value for ")
+ .append(fieldDef.name).append(", value ")
+ .append(fieldDef.regexDef.getErrMsgTemp()).toString());
+ return false;
+ }
+ }
+ procResult.setSuccResult(paramVal);
+ return true;
+ }
+
+ /**
+ * Parse the parameter string value by regex define
+ *
+ * @param procResult process result
+ * @param fieldDef the parameter field definition
+ * @param paramValue the parameter value
+ * param minValue the parameter min value
+ * @return check result for string value of parameter
+ */
+ private static boolean checkIntValueNorms(ProcessResult procResult,
+ WebFieldDef fieldDef,
+ String paramValue,
+ int minValue) {
+ try {
+ int paramIntVal = Integer.parseInt(paramValue);
+ if (paramIntVal < minValue) {
+ procResult.setFailResult(400,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name).append(" value must >= ")
+ .append(minValue).toString());
+ return false;
+ }
+ procResult.setSuccResult(paramIntVal);
+ } catch (Throwable e) {
+ procResult.setFailResult(400,
+ new StringBuilder(512).append("Parameter ")
+ .append(fieldDef.name).append(" parse error: ")
+ .append(e.getMessage()).toString());
+ return false;
+ }
+ return true;
+ }
+
/**
* Parse the parameter value from an object value to ip address of string value
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
new file mode 100644
index 0000000..b83a966
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.webbase;
+
+
+
+public enum WebFieldType {
+
+ UNKNOWN(-1, "Unknown field type"),
+ STRING(1, "String"),
+ INT(2, "int"),
+ LONG(3, "long"),
+ BOOLEAN(4, "Boolean"),
+ DATE(5, "Date"),
+ COMPSTRING(6, "Compound string"),
+ COMPINT(7, "Compound integer");
+
+
+ public int value;
+ public String desc;
+
+ WebFieldType(int value, String desc) {
+ this.value = value;
+ this.desc = desc;
+ }
+
+ public static WebFieldType valueOf(int value) {
+ for (WebFieldType fieldType : WebFieldType.values()) {
+ if (fieldType.getValue() == value) {
+ return fieldType;
+ }
+ }
+
+ return UNKNOWN;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public String getDesc(){
+ return desc;
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
similarity index 87%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
index a71ba03..a856014 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.master.web.handler;
+package org.apache.tubemq.server.common.webbase;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -24,10 +24,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WebApiMapper {
+public class WebMethodMapper {
// log printer
private static final Logger logger =
- LoggerFactory.getLogger(WebApiMapper.class);
+ LoggerFactory.getLogger(WebMethodMapper.class);
// The query methods map
public static final Map<String, WebApiRegInfo> WEB_QRY_METHOD_MAP =
new HashMap<>();
@@ -47,7 +47,7 @@ public class WebApiMapper {
public static void registerWebMethod(boolean isQryApi,
String webMethodName,
String clsMethodName,
- AbstractWebHandler webHandler) {
+ Object webHandler) {
Method[] methods = webHandler.getClass().getMethods();
for (Method item : methods) {
if (item.getName().equals(clsMethodName)) {
@@ -71,10 +71,10 @@ public class WebApiMapper {
public static class WebApiRegInfo {
public Method method;
- public AbstractWebHandler webHandler;
+ public Object webHandler;
public WebApiRegInfo(Method method,
- AbstractWebHandler webHandler) {
+ Object webHandler) {
this.method = method;
this.webHandler = webHandler;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index 326f690..5d4de04 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -17,7 +17,7 @@
package org.apache.tubemq.server.master.web.action.screen;
-import static org.apache.tubemq.server.master.web.handler.WebApiMapper.getWebApiRegInfo;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
import java.util.Arrays;
import java.util.List;
@@ -25,13 +25,13 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corerpc.exception.StandbyException;
+import org.apache.tubemq.server.common.webbase.WebMethodMapper;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.web.handler.AbstractWebHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminFlowRuleHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminGroupCtrlHandler;
import org.apache.tubemq.server.master.web.handler.WebAdminTopicAuthHandler;
-import org.apache.tubemq.server.master.web.handler.WebApiMapper;
import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler;
import org.apache.tubemq.server.master.web.handler.WebBrokerTopicConfHandler;
import org.apache.tubemq.server.master.web.handler.WebMasterInfoHandler;
@@ -102,7 +102,7 @@ public class Webapi implements Action {
"DesignatedPrimary happened...please check if the other member is down");
}
}
- WebApiMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method);
+ WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method);
if (webApiRegInfo == null) {
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Unsupported method: ")
.append(method).append("}");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
index 09b11eb..1b1bfdc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
@@ -17,7 +17,7 @@
package org.apache.tubemq.server.master.web.handler;
-import static org.apache.tubemq.server.master.web.handler.WebApiMapper.registerWebMethod;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;