You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/12 11:19:21 UTC
[incubator-inlong] branch master updated: [INLONG-2480][TubeMQ] Add WebCallStatsHolder class for Web API call statistics (#2481)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 288e46c [INLONG-2480][TubeMQ] Add WebCallStatsHolder class for Web API call statistics (#2481)
288e46c is described below
commit 288e46c2c5f6668d43634bc23ba5b0e2edc80fb0
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Feb 12 19:19:14 2022 +0800
[INLONG-2480][TubeMQ] Add WebCallStatsHolder class for Web API call statistics (#2481)
---
.../server/broker/stats/ServiceStatsHolder.java | 34 +++-
.../server/broker/web/AbstractWebHandler.java | 9 +-
.../tubemq/server/common/TServerConstants.java | 3 +
.../server/common/webbase/WebCallStatsHolder.java | 199 +++++++++++++++++++++
.../server/master/web/action/screen/Webapi.java | 7 +-
.../server/common/WebCallStatsHolderTest.java | 95 ++++++++++
6 files changed, 337 insertions(+), 10 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
index 73d2f51..7b398dc 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
@@ -19,10 +19,12 @@ package org.apache.inlong.tubemq.server.broker.stats;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
/**
* ServiceStatsHolder, statistic Broker metrics information for RPC services
@@ -39,6 +41,8 @@ public class ServiceStatsHolder {
private static final ServiceStatsSet[] switchableSets = new ServiceStatsSet[2];
// Current writable index
private static final AtomicInteger writableIndex = new AtomicInteger(0);
+ // Last snapshot time
+ private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
// Initial service statistic set
static {
@@ -56,15 +60,33 @@ public class ServiceStatsHolder {
}
public static void snapShort(Map<String, Long> statsMap) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true, statsMap);
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
+ int befIndex = writableIndex.getAndIncrement();
+ switchableSets[getIndex()].resetSinceTime();
+ getStatsValue(switchableSets[getIndex(befIndex)], true, statsMap);
+ return;
+ }
+ }
+ getValue(statsMap);
}
public static void snapShort(StringBuilder strBuff) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true, strBuff);
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
+ int befIndex = writableIndex.getAndIncrement();
+ switchableSets[getIndex()].resetSinceTime();
+ getStatsValue(switchableSets[getIndex(befIndex)], true, strBuff);
+ return;
+ }
+ }
+ getValue(strBuff);
}
// metric set operate APIs end
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/AbstractWebHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/AbstractWebHandler.java
index 99b939e..7ddbad6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/AbstractWebHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/AbstractWebHandler.java
@@ -22,11 +22,11 @@ import static org.apache.inlong.tubemq.server.common.webbase.WebMethodMapper.get
import static org.apache.inlong.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
import java.io.IOException;
import java.util.Set;
-
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.inlong.tubemq.server.broker.TubeBroker;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
import org.apache.inlong.tubemq.server.common.webbase.WebMethodMapper.WebApiRegInfo;
public abstract class AbstractWebHandler extends HttpServlet {
@@ -50,10 +50,11 @@ public abstract class AbstractWebHandler extends HttpServlet {
@Override
protected void doPost(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
+ String method = null;
StringBuilder sBuffer = new StringBuilder(1024);
-
+ long startTime = System.currentTimeMillis();
try {
- String method = req.getParameter("method");
+ method = req.getParameter("method");
if (method == null) {
sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Please take with method parameter! \"}");
@@ -71,6 +72,8 @@ public abstract class AbstractWebHandler extends HttpServlet {
.append("Bad request from server: ")
.append(e.getMessage())
.append("\"}");
+ } finally {
+ WebCallStatsHolder.addMethodCall(method, System.currentTimeMillis() - startTime);
}
resp.getWriter().write(sBuffer.toString());
resp.setCharacterEncoding(req.getCharacterEncoding());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index 70dce1f..e9d2600 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -98,4 +98,7 @@ public final class TServerConstants {
DataStoreUtils.STORE_INDEX_HEAD_LEN * 100000L;
public static final long CFG_OFFSET_RESET_MID_ALARM_CHECK =
DataStoreUtils.STORE_INDEX_HEAD_LEN * 1000000L;
+
+ // Minimum snapshot period
+ public static final long MIN_SNAPSHOT_PERIOD_MS = 5000L;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
new file mode 100644
index 0000000..a8ade6e
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.webbase;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+
+/**
+ * WebCallStatsHolder, statistic for web api calls
+ *
+ * This method class statistic the total number of web api calls and
+ * the distribution of call time consumption, as well as the total number of times and
+ * extreme time consumption of each method
+ */
+public class WebCallStatsHolder {
+ // Switchable statistic items
+ private static final WebCallStatsItemSet[] switchableSets = new WebCallStatsItemSet[2];
+ // Current writable index
+ private static final AtomicInteger writableIndex = new AtomicInteger(0);
+ // Last snapshot time
+ private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
+
+ // Initial service statistic set
+ static {
+ switchableSets[0] = new WebCallStatsItemSet();
+ switchableSets[1] = new WebCallStatsItemSet();
+ }
+
+ // metric set operate APIs begin
+ public static void getValue(Map<String, Long> statsMap) {
+ getStatsValue(switchableSets[getIndex()], false, statsMap);
+ }
+
+ public static void getValue(StringBuilder strBuff) {
+ getStatsValue(switchableSets[getIndex()], false, strBuff);
+ }
+
+ public static void snapShort(Map<String, Long> statsMap) {
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
+ int befIndex = writableIndex.getAndIncrement();
+ switchableSets[getIndex()].resetSinceTime();
+ getStatsValue(switchableSets[getIndex(befIndex)], true, statsMap);
+ return;
+ }
+ }
+ getValue(statsMap);
+ }
+
+ public static void snapShort(StringBuilder strBuff) {
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime, System.currentTimeMillis())) {
+ int befIndex = writableIndex.getAndIncrement();
+ switchableSets[getIndex()].resetSinceTime();
+ getStatsValue(switchableSets[getIndex(befIndex)], true, strBuff);
+ return;
+ }
+ }
+ getValue(strBuff);
+ }
+ // metric set operate APIs end
+
+ // metric item operate APIs begin
+ public static void addMethodCall(String method, long callDlt) {
+ method = (method == null) ? "NULL" : method;
+ WebCallStatsItemSet webCallStatsSet = switchableSets[getIndex()];
+ webCallStatsSet.totalCallStats.update(callDlt);
+ SimpleHistogram curMethodStat = webCallStatsSet.methodStatsMap.get(method);
+ if (curMethodStat == null) {
+ SimpleHistogram tmpSimpleStat = new SimpleHistogram(method, "method");
+ curMethodStat = webCallStatsSet.methodStatsMap.putIfAbsent(method, tmpSimpleStat);
+ if (curMethodStat == null) {
+ curMethodStat = tmpSimpleStat;
+ }
+ }
+ curMethodStat.update(callDlt);
+ }
+ // metric set operate APIs end
+
+ // private functions
+ private static void getStatsValue(WebCallStatsItemSet statsSet,
+ boolean resetValue,
+ Map<String, Long> statsMap) {
+ statsMap.put(statsSet.lstResetTime.getFullName(),
+ statsSet.lstResetTime.getSinceTime());
+ if (resetValue) {
+ statsSet.totalCallStats.snapShort(statsMap, false);
+ for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
+ itemStats.snapShort(statsMap, false);
+ }
+ } else {
+ statsSet.totalCallStats.getValue(statsMap, false);
+ for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
+ itemStats.getValue(statsMap, false);
+ }
+ }
+ }
+
+ private static void getStatsValue(WebCallStatsItemSet statsSet,
+ boolean resetValue,
+ StringBuilder strBuff) {
+ strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
+ .append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
+ .append("\",");
+ int totalcnt = 0;
+ if (resetValue) {
+ statsSet.totalCallStats.snapShort(strBuff, false);
+ strBuff.append(",\"").append("methods\":{");
+ for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
+ if (totalcnt++ > 0) {
+ strBuff.append(",");
+ }
+ itemStats.snapShort(strBuff, false);
+ }
+ strBuff.append("}}");
+ } else {
+ statsSet.totalCallStats.getValue(strBuff, false);
+ strBuff.append(",\"").append("methods\":{");
+ for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) {
+ if (totalcnt++ > 0) {
+ strBuff.append(",");
+ }
+ itemStats.getValue(strBuff, false);
+ }
+ strBuff.append("}}");
+ }
+ }
+
+ /**
+ * Get current writable block index.
+ *
+ * @return the writable block index
+ */
+ private static int getIndex() {
+ return getIndex(writableIndex.get());
+ }
+
+ /**
+ * Gets the metric block index based on the specified value.
+ *
+ * @param origIndex the specified value
+ * @return the metric block index
+ */
+ private static int getIndex(int origIndex) {
+ return Math.abs(origIndex % 2);
+ }
+
+ /**
+ * WebCallStatsItemSet, Switchable web call statistics block
+ *
+ * In which the object is the metric item that can be counted in stages
+ */
+ private static class WebCallStatsItemSet {
+ protected final SinceTime lstResetTime =
+ new SinceTime("reset_time", null);
+ // Total call statistics
+ protected final ESTHistogram totalCallStats =
+ new ESTHistogram("web_calls", null);
+ // Simple Statistics Based on Methods
+ protected final ConcurrentHashMap<String, SimpleHistogram> methodStatsMap =
+ new ConcurrentHashMap();
+
+ public WebCallStatsItemSet() {
+ resetSinceTime();
+ }
+
+ public void resetSinceTime() {
+ this.lstResetTime.reset();
+ }
+ }
+}
+
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Webapi.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Webapi.java
index 57e8788..0be68e1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Webapi.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Webapi.java
@@ -24,6 +24,7 @@ import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corerpc.exception.StandbyException;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
import org.apache.inlong.tubemq.server.common.webbase.WebMethodMapper;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
@@ -67,8 +68,10 @@ public class Webapi implements Action {
@Override
public void execute(RequestContext requestContext) {
+ String method = null;
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder();
+ long startTime = System.currentTimeMillis();
try {
HttpServletRequest req = requestContext.getReq();
if (this.master.isStopped()) {
@@ -78,7 +81,7 @@ public class Webapi implements Action {
if (!metaDataManager.isSelfMaster()) {
throw new StandbyException("Please send your request to the master Node.");
}
- String method = req.getParameter("method");
+ method = req.getParameter("method");
String strCallbackFun = req.getParameter("callback");
if ((TStringUtils.isNotEmpty(strCallbackFun))
&& (strCallbackFun.length() <= TBaseConstants.META_MAX_CALLBACK_STRING_LENGTH)
@@ -117,6 +120,8 @@ public class Webapi implements Action {
sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Bad request from client, ")
.append(e.getMessage()).append("\"}");
requestContext.put("sb", sBuffer.toString());
+ } finally {
+ WebCallStatsHolder.addMethodCall(method, System.currentTimeMillis() - startTime);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/WebCallStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/WebCallStatsHolderTest.java
new file mode 100644
index 0000000..e34b5ab
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/WebCallStatsHolderTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.apache.inlong.tubemq.server.common.webbase.WebCallStatsHolder;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * WebCallStatsHolder test.
+ */
+public class WebCallStatsHolderTest {
+
+ @Test
+ public void testWebCallStatsHolder() {
+ // call method, test, 3, 10, 500
+ WebCallStatsHolder.addMethodCall("test", 20);
+ WebCallStatsHolder.addMethodCall("test", 10);
+ WebCallStatsHolder.addMethodCall("test", 500);
+ // call method aaa, 1, 50, 50
+ WebCallStatsHolder.addMethodCall("aaa", 50);
+ // check result
+ Map<String, Long> retMap = new LinkedHashMap<>();
+ WebCallStatsHolder.getValue(retMap);
+ final long sinceTime1 = retMap.get("reset_time");
+ Assert.assertEquals(4, retMap.get("web_calls_count").longValue());
+ Assert.assertEquals(500, retMap.get("web_calls_max").longValue());
+ Assert.assertEquals(10, retMap.get("web_calls_min").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_8t16").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_16t32").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_32t64").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_256t512").longValue());
+ Assert.assertEquals(3, retMap.get("method_test_count").longValue());
+ Assert.assertEquals(500, retMap.get("method_test_max").longValue());
+ Assert.assertEquals(10, retMap.get("method_test_min").longValue());
+ Assert.assertEquals(1, retMap.get("method_aaa_count").longValue());
+ Assert.assertEquals(50, retMap.get("method_aaa_max").longValue());
+ Assert.assertEquals(50, retMap.get("method_aaa_min").longValue());
+ // get content by StringBuilder
+ StringBuilder strBuff = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+ WebCallStatsHolder.getValue(strBuff);
+ // System.out.println(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ ThreadUtils.sleep(100);
+ // test snapshot
+ WebCallStatsHolder.snapShort(retMap);
+ retMap.clear();
+ // call method test, 1, 300, 300
+ WebCallStatsHolder.addMethodCall("test", 300);
+ // call method mmm, 1, 40, 40
+ WebCallStatsHolder.addMethodCall("mmm", 40);
+ // call method test, 1, 1000, 1000
+ WebCallStatsHolder.addMethodCall("test", 1000);
+ WebCallStatsHolder.getValue(retMap);
+ final long sinceTime2 = retMap.get("reset_time");
+ Assert.assertNotEquals(sinceTime1, sinceTime2);
+ Assert.assertEquals(3, retMap.get("web_calls_count").longValue());
+ Assert.assertEquals(1000, retMap.get("web_calls_max").longValue());
+ Assert.assertEquals(40, retMap.get("web_calls_min").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_32t64").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_512t1024").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_256t512").longValue());
+ Assert.assertEquals(1, retMap.get("web_calls_cell_256t512").longValue());
+ Assert.assertEquals(2, retMap.get("method_test_count").longValue());
+ Assert.assertEquals(1000, retMap.get("method_test_max").longValue());
+ Assert.assertEquals(300, retMap.get("method_test_min").longValue());
+ Assert.assertEquals(1, retMap.get("method_mmm_count").longValue());
+ Assert.assertEquals(40, retMap.get("method_mmm_max").longValue());
+ Assert.assertEquals(40, retMap.get("method_mmm_min").longValue());
+ // test no data
+ WebCallStatsHolder.snapShort(retMap);
+ // get content by StringBuilder
+ WebCallStatsHolder.getValue(strBuff);
+ // System.out.println(strBuff.toString());
+ }
+}