You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/01 11:57:58 UTC
[incubator-eventmesh] branch master updated: fix issue2724
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new dc7b27281 fix issue2724
new 4d62427b3 Merge pull request #2760 from jonyangx/issue2724
dc7b27281 is described below
commit dc7b2728183a374e7becd33da5b5dc85b1ea63db
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 17:14:48 2022 +0800
fix issue2724
---
.../runtime/boot/EventMeshHTTPServer.java | 1 -
.../runtime/metrics/http/HTTPMetricsServer.java | 156 +++++++++++----------
2 files changed, 81 insertions(+), 76 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 02310cfc9..b4e1b4db1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -247,7 +247,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
httpRetryer.init();
metrics = new HTTPMetricsServer(this, metricsRegistries);
- metrics.init();
consumerManager = new ConsumerManager(this);
consumerManager.init();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index d1e6a7e97..60a32e65f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -22,6 +22,7 @@ import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -31,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
public class HTTPMetricsServer {
- private static final Logger HTTP_LOGGER = LoggerFactory.getLogger("httpMonitor");
+ private static final Logger LOGGER = LoggerFactory.getLogger(HTTPMetricsServer.class);
private final transient EventMeshHTTPServer eventMeshHTTPServer;
@@ -44,30 +42,37 @@ public class HTTPMetricsServer {
private final transient HttpSummaryMetrics summaryMetrics;
- public HTTPMetricsServer(EventMeshHTTPServer eventMeshHTTPServer, List<MetricsRegistry> metricsRegistries) {
+ public HTTPMetricsServer(final EventMeshHTTPServer eventMeshHTTPServer,
+ final List<MetricsRegistry> metricsRegistries) {
+ Objects.requireNonNull(eventMeshHTTPServer, "EventMeshHTTPServer can not be null");
+ Objects.requireNonNull(metricsRegistries, "List<MetricsRegistry> can not be null");
+
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.metricsRegistries = metricsRegistries;
this.summaryMetrics = new HttpSummaryMetrics(
- eventMeshHTTPServer.batchMsgExecutor,
- eventMeshHTTPServer.sendMsgExecutor,
- eventMeshHTTPServer.pushMsgExecutor,
- eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+ eventMeshHTTPServer.batchMsgExecutor,
+ eventMeshHTTPServer.sendMsgExecutor,
+ eventMeshHTTPServer.pushMsgExecutor,
+ eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+
+ init();
}
- public void init() throws Exception {
+ private void init() {
metricsRegistries.forEach(MetricsRegistry::start);
- if (log.isInfoEnabled()) {
- log.info("HTTPMetricsServer initialized......");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("HTTPMetricsServer initialized.");
}
}
- public void start() throws Exception {
+ public void start() {
metricsRegistries.forEach(metricsRegistry -> {
metricsRegistry.register(summaryMetrics);
- if (log.isInfoEnabled()) {
- log.info("Register httpMetrics to " + metricsRegistry.getClass().getName());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Register httpMetrics to {}", metricsRegistry.getClass().getName());
}
});
+
metricsSchedule.scheduleAtFixedRate(() -> {
try {
summaryMetrics.snapshotHTTPTPS();
@@ -75,114 +80,115 @@ public class HTTPMetricsServer {
summaryMetrics.snapshotSendMsgTPS();
summaryMetrics.snapshotPushMsgTPS();
} catch (Exception ex) {
- log.warn("eventMesh snapshot tps metrics err", ex);
+ LOGGER.error("eventMesh snapshot tps metrics err", ex);
}
}, 0, 1000, TimeUnit.MILLISECONDS);
metricsSchedule.scheduleAtFixedRate(() -> {
try {
- logPrintServerMetrics();
+ logPrintServerMetrics(summaryMetrics, eventMeshHTTPServer);
} catch (Exception ex) {
- log.warn("eventMesh print metrics err", ex);
+ LOGGER.error("eventMesh print metrics err", ex);
}
}, 1000, 30 * 1000, TimeUnit.MILLISECONDS);
- if (log.isInfoEnabled()) {
- log.info("HTTPMetricsServer started......");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("HTTPMetricsServer started.");
}
}
- public void shutdown() throws Exception {
+ public void shutdown() {
metricsSchedule.shutdown();
metricsRegistries.forEach(MetricsRegistry::showdown);
- if (log.isInfoEnabled()) {
- log.info("HTTPMetricsServer shutdown......");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("HTTPMetricsServer shutdown.");
}
}
- protected static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+ private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
private final transient AtomicInteger seq = new AtomicInteger(0);
@Override
- public Thread newThread(Runnable r) {
+ public Thread newThread(final Runnable r) {
seq.incrementAndGet();
- Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
+ final Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
t.setDaemon(true);
return t;
}
});
// todo: move this into standalone metrics plugin
- private void logPrintServerMetrics() {
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info("===========================================SERVER METRICS==================================================");
-
- HTTP_LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
- summaryMetrics.maxHTTPTPS(),
- summaryMetrics.avgHTTPTPS(),
- summaryMetrics.maxHTTPCost(),
- summaryMetrics.avgHTTPCost(),
- summaryMetrics.avgHTTPBodyDecodeCost(),
- summaryMetrics.getHttpDiscard());
+ private void logPrintServerMetrics(final HttpSummaryMetrics summaryMetrics,
+ final EventMeshHTTPServer eventMeshHTTPServer) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("===========================================SERVER METRICS==================================================");
+
+ LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
+ summaryMetrics.maxHTTPTPS(),
+ summaryMetrics.avgHTTPTPS(),
+ summaryMetrics.maxHTTPCost(),
+ summaryMetrics.avgHTTPCost(),
+ summaryMetrics.avgHTTPBodyDecodeCost(),
+ summaryMetrics.getHttpDiscard());
}
summaryMetrics.httpStatInfoClear();
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
- summaryMetrics.maxSendBatchMsgTPS(),
- summaryMetrics.avgSendBatchMsgTPS(),
- summaryMetrics.getSendBatchMsgNumSum(),
- summaryMetrics.getSendBatchMsgFailNumSum(),
- summaryMetrics.getSendBatchMsgFailRate(),
- summaryMetrics.getSendBatchMsgDiscardNumSum()
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
+ summaryMetrics.maxSendBatchMsgTPS(),
+ summaryMetrics.avgSendBatchMsgTPS(),
+ summaryMetrics.getSendBatchMsgNumSum(),
+ summaryMetrics.getSendBatchMsgFailNumSum(),
+ summaryMetrics.getSendBatchMsgFailRate(),
+ summaryMetrics.getSendBatchMsgDiscardNumSum()
);
}
summaryMetrics.cleanSendBatchStat();
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
- summaryMetrics.maxSendMsgTPS(),
- summaryMetrics.avgSendMsgTPS(),
- summaryMetrics.getSendMsgNumSum(),
- summaryMetrics.getSendMsgFailNumSum(),
- summaryMetrics.getSendMsgFailRate(),
- summaryMetrics.getReplyMsgNumSum(),
- summaryMetrics.getReplyMsgFailNumSum()
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
+ summaryMetrics.maxSendMsgTPS(),
+ summaryMetrics.avgSendMsgTPS(),
+ summaryMetrics.getSendMsgNumSum(),
+ summaryMetrics.getSendMsgFailNumSum(),
+ summaryMetrics.getSendMsgFailRate(),
+ summaryMetrics.getReplyMsgNumSum(),
+ summaryMetrics.getReplyMsgFailNumSum()
);
}
summaryMetrics.cleanSendMsgStat();
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info(
- "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
- summaryMetrics.maxPushMsgTPS(),
- summaryMetrics.avgPushMsgTPS(),
- summaryMetrics.getHttpPushMsgNumSum(),
- summaryMetrics.getHttpPushFailNumSum(),
- summaryMetrics.getHttpPushMsgFailRate(),
- summaryMetrics.maxHTTPPushLatency(),
- summaryMetrics.avgHTTPPushLatency()
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
+ summaryMetrics.maxPushMsgTPS(),
+ summaryMetrics.avgPushMsgTPS(),
+ summaryMetrics.getHttpPushMsgNumSum(),
+ summaryMetrics.getHttpPushFailNumSum(),
+ summaryMetrics.getHttpPushMsgFailRate(),
+ summaryMetrics.maxHTTPPushLatency(),
+ summaryMetrics.avgHTTPPushLatency()
);
}
summaryMetrics.cleanHttpPushMsgStat();
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
- eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getHttpRetryer().size());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
+ eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getHttpRetryer().size());
}
- if (HTTP_LOGGER.isInfoEnabled()) {
- HTTP_LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
- summaryMetrics.avgBatchSendMsgCost(),
- summaryMetrics.avgSendMsgCost(),
- summaryMetrics.avgReplyMsgCost());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
+ summaryMetrics.avgBatchSendMsgCost(),
+ summaryMetrics.avgSendMsgCost(),
+ summaryMetrics.avgReplyMsgCost());
}
summaryMetrics.send2MQStatInfoClear();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org