You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/17 08:57:12 UTC
[incubator-inlong] branch master updated: [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cab7dcb [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
cab7dcb is described below
commit cab7dcb168ef992d6e8546b324e13211b2438c95
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 17 16:57:04 2022 +0800
[INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
---
.../server/broker/web/BrokerAdminServlet.java | 22 ++-
.../tubemq/server/common/TServerConstants.java | 2 +-
.../master/web/handler/WebOtherInfoHandler.java | 176 +++++++++++++++++++++
3 files changed, 186 insertions(+), 14 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index bcdaa5b..0057eb6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1101,7 +1101,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
broker.getOffsetManager().deleteGroupOffset(
onlyMemory, groupTopicPartMap, modifier);
// builder return result
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ WebParameterUtils.buildSuccessResult(sBuffer);
}
/**
@@ -1120,10 +1120,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
return;
}
final boolean needRefresh = (Boolean) result.getRetData();
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"probeTime\":\"")
+ // build return result
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"probeTime\":\"")
.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
.append("\",\"nodeName\":\"").append(broker.getTubeConfig().getHostName())
- .append("\",\"role\":\"Broker\",\"metrics\":{\"serviceStatus\":");
+ .append("\",\"nodeRole\":\"Broker\",\"metrics\":{\"serviceStatus\":");
if (needRefresh) {
BrokerSrvStatsHolder.snapShort(sBuffer);
sBuffer.append(",\"webAPI\":");
@@ -1134,6 +1136,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
WebCallStatsHolder.getValue(sBuffer);
}
sBuffer.append("},\"count\":2}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
}
/**
@@ -1160,7 +1163,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// query data
int index = 0;
int recordId = 0;
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"dataSet\":[");
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
broker.getStoreManager().getMessageStores();
if (topicNameSet.isEmpty()) {
@@ -1222,7 +1225,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
sBuffer.append("]}");
}
}
- sBuffer.append("],\"totalCount\":").append(recordId).append("}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, recordId);
}
/**
@@ -1269,13 +1272,6 @@ public class BrokerAdminServlet extends AbstractWebHandler {
*/
public void adminDisableAllStats(HttpServletRequest req,
StringBuilder sBuffer) {
- ProcessResult result = new ProcessResult();
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
- return;
- }
- String statsType = (String) result.getRetData();
innEnableOrDisableMetricsStats(false,
BrokerStatsType.ALL.getName(), req, sBuffer);
}
@@ -1365,7 +1361,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
}
}
// builder return result
- sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ WebParameterUtils.buildSuccessResult(sBuffer);
}
// build reset offset info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index 8f49962..a336ae8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -100,6 +100,6 @@ public final class TServerConstants {
DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
// Minimum snapshot period
- public static final long MIN_SNAPSHOT_PERIOD_MS = 5000L;
+ public static final long MIN_SNAPSHOT_PERIOD_MS = 2000L;
public static final int META_MAX_STATSTYPE_LENGTH = 256;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 10791a0..0cc1fc6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -30,8 +30,12 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
+import org.apache.inlong.tubemq.server.common.TubeServerVersion;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
@@ -39,6 +43,8 @@ import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeTyp
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
+import org.apache.inlong.tubemq.server.master.stats.MasterSrvStatsHolder;
+import org.apache.inlong.tubemq.server.master.stats.MasterStatsType;
public class WebOtherInfoHandler extends AbstractWebHandler {
@@ -58,6 +64,21 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
"getSubscribeInfo");
registerQueryWebMethod("admin_query_consume_group_detail",
"getConsumeGroupDetailInfo");
+ // query master's version
+ registerQueryWebMethod("admin_query_server_version",
+ "adminQueryMasterVersion");
+ // register query method
+ registerQueryWebMethod("admin_get_metrics_info",
+ "adminGetMetricsInfo");
+ // Enable metrics statistics
+ registerModifyWebMethod("admin_enable_stats",
+ "adminEnableMetricsStats");
+ // Disable metrics statistics
+ registerModifyWebMethod("admin_disable_stats",
+ "adminDisableMetricsStats");
+ // Disable unnecessary statistics
+ registerModifyWebMethod("admin_disable_all_stats",
+ "adminDisableAllStats");
}
/**
@@ -261,6 +282,161 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
}
/**
+ * Query Master's version
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminQueryBrokerVersion(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"version\":\"")
+ .append(TubeServerVersion.SERVER_VERSION).append("\"}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+ return sBuffer;
+ }
+
+ /**
+ * Get master's metric information
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminGetMetricsInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get whether to reset the metric items
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.NEEDREFRESH, false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final boolean needRefresh = (Boolean) result.getRetData();
+ // query current metric values;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ sBuffer.append("{\"probeTime\":\"")
+ .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
+ .append("\",\"nodeName\":\"").append(master.getMasterConfig().getHostName())
+ .append("\",\"nodeRole\":\"Master\",\"metrics\":{\"serviceStatus\":");
+ if (needRefresh) {
+ MasterSrvStatsHolder.snapShort(sBuffer);
+ sBuffer.append(",\"webAPI\":");
+ WebCallStatsHolder.snapShort(sBuffer);
+ } else {
+ MasterSrvStatsHolder.getValue(sBuffer);
+ sBuffer.append(",\"webAPI\":");
+ WebCallStatsHolder.getValue(sBuffer);
+ }
+ sBuffer.append("},\"count\":2}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+ return sBuffer;
+ }
+
+ /**
+ * Enable Master's statistics functions.
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return metric information
+ */
+ public StringBuilder adminEnableMetricsStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ String statsType = (String) result.getRetData();
+ return innEnableOrDisableMetricsStats(true, statsType, req, sBuffer, result);
+ }
+
+ /**
+ * Disable Master's statistics functions.
+ *
+ * @param req request
+ * @param sBuffer process result
+ */
+ public StringBuilder adminDisableMetricsStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ String statsType = (String) result.getRetData();
+ innEnableOrDisableMetricsStats(true, statsType, req, sBuffer, result);
+ return sBuffer;
+ }
+
+ /**
+ * Disable Master's all statistics functions.
+ *
+ * @param req request
+ * @param sBuffer process result
+ */
+ public StringBuilder adminDisableAllStats(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ innEnableOrDisableMetricsStats(false,
+ BrokerStatsType.ALL.getName(), req, sBuffer, result);
+ return sBuffer;
+ }
+
+ /**
+ * Disable or Enable Master's statistics functions
+ *
+ * @param enable whether enable or disable
+ * @param statsType the statistics type to be operated on
+ * @param req HttpServletRequest
+ * @param sBuffer query result
+ * @param result process result
+ * @return return information
+ */
+ private StringBuilder innEnableOrDisableMetricsStats(boolean enable,
+ String statsType,
+ HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // get input metric type
+ MasterStatsType inMetricType = null;
+ for (MasterStatsType metricType : MasterStatsType.values()) {
+ if (metricType.getName().equalsIgnoreCase(statsType)) {
+ inMetricType = metricType;
+ break;
+ }
+ }
+ if (inMetricType == null) {
+ sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":")
+ .append("\"Unmatched stat type, allowed stat type are : [");
+ int count = 0;
+ for (MasterStatsType metricType : MasterStatsType.values()) {
+ if (count++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append(metricType.getDesc());
+ }
+ sBuffer.append("]\"}");
+ return sBuffer;
+ }
+ // Operate separately according to the specified statistic type
+ if (inMetricType == MasterStatsType.WEBAPI
+ || inMetricType == MasterStatsType.ALL) {
+ WebCallStatsHolder.setStatsStatus(enable);
+ }
+ // builder return result
+ WebParameterUtils.buildSuccessResult(sBuffer);
+ return sBuffer;
+ }
+
+ /**
* Private method to append consumer info of the give list to a string builder
*
* @param consumerList consumer list