You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/17 08:57:12 UTC

[incubator-inlong] branch master updated: [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cab7dcb  [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
cab7dcb is described below

commit cab7dcb168ef992d6e8546b324e13211b2438c95
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 17 16:57:04 2022 +0800

    [INLONG-2552][TubeMQ] Add Master metric operation APIs (#2553)
---
 .../server/broker/web/BrokerAdminServlet.java      |  22 ++-
 .../tubemq/server/common/TServerConstants.java     |   2 +-
 .../master/web/handler/WebOtherInfoHandler.java    | 176 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 14 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index bcdaa5b..0057eb6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1101,7 +1101,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         broker.getOffsetManager().deleteGroupOffset(
                 onlyMemory, groupTopicPartMap, modifier);
         // builder return result
-        sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+        WebParameterUtils.buildSuccessResult(sBuffer);
     }
 
     /**
@@ -1120,10 +1120,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             return;
         }
         final boolean needRefresh = (Boolean) result.getRetData();
-        sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"probeTime\":\"")
+        // build return result
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        sBuffer.append("{\"probeTime\":\"")
                 .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
                 .append("\",\"nodeName\":\"").append(broker.getTubeConfig().getHostName())
-                .append("\",\"role\":\"Broker\",\"metrics\":{\"serviceStatus\":");
+                .append("\",\"nodeRole\":\"Broker\",\"metrics\":{\"serviceStatus\":");
         if (needRefresh) {
             BrokerSrvStatsHolder.snapShort(sBuffer);
             sBuffer.append(",\"webAPI\":");
@@ -1134,6 +1136,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             WebCallStatsHolder.getValue(sBuffer);
         }
         sBuffer.append("},\"count\":2}");
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
     }
 
     /**
@@ -1160,7 +1163,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // query data
         int index = 0;
         int recordId = 0;
-        sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"dataSet\":[");
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
                 broker.getStoreManager().getMessageStores();
         if (topicNameSet.isEmpty()) {
@@ -1222,7 +1225,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                 sBuffer.append("]}");
             }
         }
-        sBuffer.append("],\"totalCount\":").append(recordId).append("}");
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, recordId);
     }
 
     /**
@@ -1269,13 +1272,6 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminDisableAllStats(HttpServletRequest req,
                                      StringBuilder sBuffer) {
-        ProcessResult result = new ProcessResult();
-        if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
-            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
-            return;
-        }
-        String statsType = (String) result.getRetData();
         innEnableOrDisableMetricsStats(false,
                 BrokerStatsType.ALL.getName(), req, sBuffer);
     }
@@ -1365,7 +1361,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             }
         }
         // builder return result
-        sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+        WebParameterUtils.buildSuccessResult(sBuffer);
     }
 
     // build reset offset info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index 8f49962..a336ae8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -100,6 +100,6 @@ public final class TServerConstants {
             DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
 
     // Minimum snapshot period
-    public static final long MIN_SNAPSHOT_PERIOD_MS = 5000L;
+    public static final long MIN_SNAPSHOT_PERIOD_MS = 2000L;
     public static final int META_MAX_STATSTYPE_LENGTH = 256;
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 10791a0..0cc1fc6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -30,8 +30,12 @@ import javax.servlet.http.HttpServletRequest;
 
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
+import org.apache.inlong.tubemq.server.common.TubeServerVersion;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
 import org.apache.inlong.tubemq.server.master.TMaster;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
@@ -39,6 +43,8 @@ import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeTyp
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
 import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
+import org.apache.inlong.tubemq.server.master.stats.MasterSrvStatsHolder;
+import org.apache.inlong.tubemq.server.master.stats.MasterStatsType;
 
 public class WebOtherInfoHandler extends AbstractWebHandler {
 
@@ -58,6 +64,21 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                 "getSubscribeInfo");
         registerQueryWebMethod("admin_query_consume_group_detail",
                 "getConsumeGroupDetailInfo");
+        // query master's version
+        registerQueryWebMethod("admin_query_server_version",
+                "adminQueryMasterVersion");
+        // register query method
+        registerQueryWebMethod("admin_get_metrics_info",
+                "adminGetMetricsInfo");
+        // Enable metrics statistics
+        registerModifyWebMethod("admin_enable_stats",
+                "adminEnableMetricsStats");
+        // Disable metrics statistics
+        registerModifyWebMethod("admin_disable_stats",
+                "adminDisableMetricsStats");
+        // Disable unnecessary statistics
+        registerModifyWebMethod("admin_disable_all_stats",
+                "adminDisableAllStats");
     }
 
     /**
@@ -261,6 +282,161 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
     }
 
     /**
+     * Query Master's version
+     *
+     * @param req       Http Servlet Request
+     * @param sBuffer   string buffer
+     * @param result    process result
+     * @return          metric information
+     */
+    public StringBuilder adminQueryBrokerVersion(HttpServletRequest req,
+                                                 StringBuilder sBuffer,
+                                                 ProcessResult result) {
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        sBuffer.append("{\"version\":\"")
+                .append(TubeServerVersion.SERVER_VERSION).append("\"}");
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+        return sBuffer;
+    }
+
+    /**
+     * Get master's metric information
+     *
+     * @param req       Http Servlet Request
+     * @param sBuffer   string buffer
+     * @param result    process result
+     * @return          metric information
+     */
+    public StringBuilder adminGetMetricsInfo(HttpServletRequest req,
+                                             StringBuilder sBuffer,
+                                             ProcessResult result) {
+        // check and get whether to reset the metric items
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.NEEDREFRESH, false, false, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        final boolean needRefresh = (Boolean) result.getRetData();
+        // query current metric values;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        sBuffer.append("{\"probeTime\":\"")
+                .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()))
+                .append("\",\"nodeName\":\"").append(master.getMasterConfig().getHostName())
+                .append("\",\"nodeRole\":\"Master\",\"metrics\":{\"serviceStatus\":");
+        if (needRefresh) {
+            MasterSrvStatsHolder.snapShort(sBuffer);
+            sBuffer.append(",\"webAPI\":");
+            WebCallStatsHolder.snapShort(sBuffer);
+        } else {
+            MasterSrvStatsHolder.getValue(sBuffer);
+            sBuffer.append(",\"webAPI\":");
+            WebCallStatsHolder.getValue(sBuffer);
+        }
+        sBuffer.append("},\"count\":2}");
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+        return sBuffer;
+    }
+
+    /**
+     * Enable Master's statistics functions.
+     *
+     * @param req       Http Servlet Request
+     * @param sBuffer   string buffer
+     * @param result    process result
+     * @return          metric information
+     */
+    public StringBuilder adminEnableMetricsStats(HttpServletRequest req,
+                                                 StringBuilder sBuffer,
+                                                 ProcessResult result) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        String statsType = (String) result.getRetData();
+        return innEnableOrDisableMetricsStats(true, statsType, req, sBuffer, result);
+    }
+
+    /**
+     * Disable Master's statistics functions.
+     *
+     * @param req      request
+     * @param sBuffer  process result
+     */
+    public StringBuilder adminDisableMetricsStats(HttpServletRequest req,
+                                                  StringBuilder sBuffer,
+                                                  ProcessResult result) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.STATSTYPE, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        String statsType = (String) result.getRetData();
+        innEnableOrDisableMetricsStats(true, statsType, req, sBuffer, result);
+        return sBuffer;
+    }
+
+    /**
+     * Disable Master's all statistics functions.
+     *
+     * @param req      request
+     * @param sBuffer  process result
+     */
+    public StringBuilder adminDisableAllStats(HttpServletRequest req,
+                                              StringBuilder sBuffer,
+                                              ProcessResult result) {
+        innEnableOrDisableMetricsStats(false,
+                BrokerStatsType.ALL.getName(), req, sBuffer, result);
+        return sBuffer;
+    }
+
+    /**
+     * Disable or Enable Master's statistics functions
+     *
+     * @param enable     whether enable or disable
+     * @param statsType  the statistics type to be operated on
+     * @param req        HttpServletRequest
+     * @param sBuffer    query result
+     * @param result     process result
+     * @return           return information
+     */
+    private StringBuilder innEnableOrDisableMetricsStats(boolean enable,
+                                                         String statsType,
+                                                         HttpServletRequest req,
+                                                         StringBuilder sBuffer,
+                                                         ProcessResult result) {
+        // get input metric type
+        MasterStatsType inMetricType = null;
+        for (MasterStatsType metricType : MasterStatsType.values()) {
+            if (metricType.getName().equalsIgnoreCase(statsType)) {
+                inMetricType = metricType;
+                break;
+            }
+        }
+        if (inMetricType == null) {
+            sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":")
+                    .append("\"Unmatched stat type, allowed stat type are : [");
+            int count = 0;
+            for (MasterStatsType metricType : MasterStatsType.values()) {
+                if (count++ > 0) {
+                    sBuffer.append(",");
+                }
+                sBuffer.append(metricType.getDesc());
+            }
+            sBuffer.append("]\"}");
+            return sBuffer;
+        }
+        // Operate separately according to the specified statistic type
+        if (inMetricType == MasterStatsType.WEBAPI
+                || inMetricType == MasterStatsType.ALL) {
+            WebCallStatsHolder.setStatsStatus(enable);
+        }
+        // builder return result
+        WebParameterUtils.buildSuccessResult(sBuffer);
+        return sBuffer;
+    }
+
+    /**
      * Private method to append consumer info of the give list to a string builder
      *
      * @param consumerList  consumer list