You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/01 11:57:58 UTC

[incubator-eventmesh] branch master updated: fix issue2724

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new dc7b27281 fix issue2724
     new 4d62427b3 Merge pull request #2760 from jonyangx/issue2724
dc7b27281 is described below

commit dc7b2728183a374e7becd33da5b5dc85b1ea63db
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 17:14:48 2022 +0800

    fix issue2724
---
 .../runtime/boot/EventMeshHTTPServer.java          |   1 -
 .../runtime/metrics/http/HTTPMetricsServer.java    | 156 +++++++++++----------
 2 files changed, 81 insertions(+), 76 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 02310cfc9..b4e1b4db1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -247,7 +247,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
         httpRetryer.init();
 
         metrics = new HTTPMetricsServer(this, metricsRegistries);
-        metrics.init();
 
         consumerManager = new ConsumerManager(this);
         consumerManager.init();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index d1e6a7e97..60a32e65f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -22,6 +22,7 @@ import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -31,12 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
 public class HTTPMetricsServer {
 
-    private static final Logger HTTP_LOGGER = LoggerFactory.getLogger("httpMonitor");
+    private static final Logger LOGGER = LoggerFactory.getLogger(HTTPMetricsServer.class);
 
     private final transient EventMeshHTTPServer eventMeshHTTPServer;
 
@@ -44,30 +42,37 @@ public class HTTPMetricsServer {
 
     private final transient HttpSummaryMetrics summaryMetrics;
 
-    public HTTPMetricsServer(EventMeshHTTPServer eventMeshHTTPServer, List<MetricsRegistry> metricsRegistries) {
+    public HTTPMetricsServer(final EventMeshHTTPServer eventMeshHTTPServer,
+                             final List<MetricsRegistry> metricsRegistries) {
+        Objects.requireNonNull(eventMeshHTTPServer, "EventMeshHTTPServer can not be null");
+        Objects.requireNonNull(metricsRegistries, "List<MetricsRegistry> can not be null");
+
         this.eventMeshHTTPServer = eventMeshHTTPServer;
         this.metricsRegistries = metricsRegistries;
         this.summaryMetrics = new HttpSummaryMetrics(
-            eventMeshHTTPServer.batchMsgExecutor,
-            eventMeshHTTPServer.sendMsgExecutor,
-            eventMeshHTTPServer.pushMsgExecutor,
-            eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+                eventMeshHTTPServer.batchMsgExecutor,
+                eventMeshHTTPServer.sendMsgExecutor,
+                eventMeshHTTPServer.pushMsgExecutor,
+                eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+
+        init();
     }
 
-    public void init() throws Exception {
+    private void init() {
         metricsRegistries.forEach(MetricsRegistry::start);
-        if (log.isInfoEnabled()) {
-            log.info("HTTPMetricsServer initialized......");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("HTTPMetricsServer initialized.");
         }
     }
 
-    public void start() throws Exception {
+    public void start() {
         metricsRegistries.forEach(metricsRegistry -> {
             metricsRegistry.register(summaryMetrics);
-            if (log.isInfoEnabled()) {
-                log.info("Register httpMetrics to " + metricsRegistry.getClass().getName());
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("Register httpMetrics to {}", metricsRegistry.getClass().getName());
             }
         });
+
         metricsSchedule.scheduleAtFixedRate(() -> {
             try {
                 summaryMetrics.snapshotHTTPTPS();
@@ -75,114 +80,115 @@ public class HTTPMetricsServer {
                 summaryMetrics.snapshotSendMsgTPS();
                 summaryMetrics.snapshotPushMsgTPS();
             } catch (Exception ex) {
-                log.warn("eventMesh snapshot tps metrics err", ex);
+                LOGGER.error("eventMesh snapshot tps metrics err", ex);
             }
         }, 0, 1000, TimeUnit.MILLISECONDS);
 
         metricsSchedule.scheduleAtFixedRate(() -> {
             try {
-                logPrintServerMetrics();
+                logPrintServerMetrics(summaryMetrics, eventMeshHTTPServer);
             } catch (Exception ex) {
-                log.warn("eventMesh print metrics err", ex);
+                LOGGER.error("eventMesh print metrics err", ex);
             }
         }, 1000, 30 * 1000, TimeUnit.MILLISECONDS);
 
-        if (log.isInfoEnabled()) {
-            log.info("HTTPMetricsServer started......");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("HTTPMetricsServer started.");
         }
     }
 
-    public void shutdown() throws Exception {
+    public void shutdown() {
         metricsSchedule.shutdown();
         metricsRegistries.forEach(MetricsRegistry::showdown);
-        if (log.isInfoEnabled()) {
-            log.info("HTTPMetricsServer shutdown......");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("HTTPMetricsServer shutdown.");
         }
     }
 
-    protected static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+    private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
         private final transient AtomicInteger seq = new AtomicInteger(0);
 
         @Override
-        public Thread newThread(Runnable r) {
+        public Thread newThread(final Runnable r) {
             seq.incrementAndGet();
-            Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
+            final Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
             t.setDaemon(true);
             return t;
         }
     });
 
     // todo: move this into standalone metrics plugin
-    private void logPrintServerMetrics() {
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info("===========================================SERVER METRICS==================================================");
-
-            HTTP_LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
-                summaryMetrics.maxHTTPTPS(),
-                summaryMetrics.avgHTTPTPS(),
-                summaryMetrics.maxHTTPCost(),
-                summaryMetrics.avgHTTPCost(),
-                summaryMetrics.avgHTTPBodyDecodeCost(),
-                summaryMetrics.getHttpDiscard());
+    private void logPrintServerMetrics(final HttpSummaryMetrics summaryMetrics,
+                                       final EventMeshHTTPServer eventMeshHTTPServer) {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("===========================================SERVER METRICS==================================================");
+
+            LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
+                    summaryMetrics.maxHTTPTPS(),
+                    summaryMetrics.avgHTTPTPS(),
+                    summaryMetrics.maxHTTPCost(),
+                    summaryMetrics.avgHTTPCost(),
+                    summaryMetrics.avgHTTPBodyDecodeCost(),
+                    summaryMetrics.getHttpDiscard());
         }
 
         summaryMetrics.httpStatInfoClear();
 
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
-                summaryMetrics.maxSendBatchMsgTPS(),
-                summaryMetrics.avgSendBatchMsgTPS(),
-                summaryMetrics.getSendBatchMsgNumSum(),
-                summaryMetrics.getSendBatchMsgFailNumSum(),
-                summaryMetrics.getSendBatchMsgFailRate(),
-                summaryMetrics.getSendBatchMsgDiscardNumSum()
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
+                    summaryMetrics.maxSendBatchMsgTPS(),
+                    summaryMetrics.avgSendBatchMsgTPS(),
+                    summaryMetrics.getSendBatchMsgNumSum(),
+                    summaryMetrics.getSendBatchMsgFailNumSum(),
+                    summaryMetrics.getSendBatchMsgFailRate(),
+                    summaryMetrics.getSendBatchMsgDiscardNumSum()
             );
         }
 
         summaryMetrics.cleanSendBatchStat();
 
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
-                summaryMetrics.maxSendMsgTPS(),
-                summaryMetrics.avgSendMsgTPS(),
-                summaryMetrics.getSendMsgNumSum(),
-                summaryMetrics.getSendMsgFailNumSum(),
-                summaryMetrics.getSendMsgFailRate(),
-                summaryMetrics.getReplyMsgNumSum(),
-                summaryMetrics.getReplyMsgFailNumSum()
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
+                    summaryMetrics.maxSendMsgTPS(),
+                    summaryMetrics.avgSendMsgTPS(),
+                    summaryMetrics.getSendMsgNumSum(),
+                    summaryMetrics.getSendMsgFailNumSum(),
+                    summaryMetrics.getSendMsgFailRate(),
+                    summaryMetrics.getReplyMsgNumSum(),
+                    summaryMetrics.getReplyMsgFailNumSum()
             );
         }
 
         summaryMetrics.cleanSendMsgStat();
 
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info(
-                "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
-                summaryMetrics.maxPushMsgTPS(),
-                summaryMetrics.avgPushMsgTPS(),
-                summaryMetrics.getHttpPushMsgNumSum(),
-                summaryMetrics.getHttpPushFailNumSum(),
-                summaryMetrics.getHttpPushMsgFailRate(),
-                summaryMetrics.maxHTTPPushLatency(),
-                summaryMetrics.avgHTTPPushLatency()
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info(
+                    "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
+                    summaryMetrics.maxPushMsgTPS(),
+                    summaryMetrics.avgPushMsgTPS(),
+                    summaryMetrics.getHttpPushMsgNumSum(),
+                    summaryMetrics.getHttpPushFailNumSum(),
+                    summaryMetrics.getHttpPushMsgFailRate(),
+                    summaryMetrics.maxHTTPPushLatency(),
+                    summaryMetrics.avgHTTPPushLatency()
             );
         }
 
         summaryMetrics.cleanHttpPushMsgStat();
 
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
-                eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
-                eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
-                eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
-                eventMeshHTTPServer.getHttpRetryer().size());
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
+                    eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
+                    eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
+                    eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
+                    eventMeshHTTPServer.getHttpRetryer().size());
         }
 
-        if (HTTP_LOGGER.isInfoEnabled()) {
-            HTTP_LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
-                summaryMetrics.avgBatchSendMsgCost(),
-                summaryMetrics.avgSendMsgCost(),
-                summaryMetrics.avgReplyMsgCost());
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
+                    summaryMetrics.avgBatchSendMsgCost(),
+                    summaryMetrics.avgSendMsgCost(),
+                    summaryMetrics.avgReplyMsgCost());
         }
         summaryMetrics.send2MQStatInfoClear();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org