You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/08/24 08:38:37 UTC

[incubator-eventmesh] branch develop updated: otel metric polish (#503)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new d46b8bd  otel metric polish (#503)
d46b8bd is described below

commit d46b8bd3f4f4637aaa6af7def4b5441e65f319cf
Author: YuDong Tang <58...@qq.com>
AuthorDate: Tue Aug 24 16:38:22 2021 +0800

    otel metric polish (#503)
---
 .../runtime/metrics/http/HTTPMetricsServer.java    |   2 +-
 .../OpenTelemetryExporterConfiguration.java        |  68 ---------
 .../OpenTelemetryHTTPMetricsExporter.java          | 156 ++++++++++-----------
 .../OpenTelemetryPrometheusExporter.java           |  60 ++++++++
 .../OpenTelemetryTCPMetricsExporter.java           |  48 +++----
 .../runtime/metrics/tcp/EventMeshTcpMonitor.java   |   2 +-
 6 files changed, 156 insertions(+), 180 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index a71e42f..d49167f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.codahale.metrics.MetricRegistry;
 
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryHTTPMetricsExporter;
+import org.apache.eventmesh.runtime.metrics.opentelemetry.OpenTelemetryHTTPMetricsExporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
deleted file mode 100644
index 40752ea..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
-
-import io.opentelemetry.api.metrics.MeterProvider;
-import io.opentelemetry.exporter.prometheus.PrometheusCollector;
-import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import io.prometheus.client.exporter.HTTPServer;
-import org.apache.eventmesh.common.config.CommonConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-
-import java.io.IOException;
-
-//ues openTelemetry to export metrics data
-public class OpenTelemetryExporterConfiguration {
-
-    private static HTTPServer server;//Prometheus server
-
-    static int prometheusPort;//the endpoint to export metrics
-
-    static MeterProvider meterProvider;
-    /**
-     * Initializes the Meter SDK and configures the prometheus collector with all default settings.
-     *
-     * @return A MeterProvider for use in instrumentation.
-     */
-    public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) {
-        if (server!=null){//the sever already start
-            return meterProvider;
-        }
-
-        prometheusPort = configuration.eventMeshPrometheusPort;
-        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
-        PrometheusCollector.builder().setMetricProducer(sdkMeterProvider).buildAndRegister();
-        this.meterProvider = sdkMeterProvider;
-        try {
-            server = new HTTPServer(prometheusPort,true);//Use the daemon thread to start an HTTP server to serve the default Prometheus registry.
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return meterProvider;
-    }
-
-    public static MeterProvider getMeterProvider(){//for tcp or http to get the initialized meterProvider
-        return meterProvider;
-    }
-
-    public void shutdownPrometheusEndpoint() {
-        if (server==null)
-            return;
-        server.stop();
-    }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
similarity index 77%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
index ff2a31e..a94b7e1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
@@ -15,303 +15,295 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
 
+import io.opentelemetry.api.metrics.GlobalMeterProvider;
 import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.api.metrics.MeterProvider;
 import io.opentelemetry.api.metrics.common.Labels;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
 import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics;
 
 public class OpenTelemetryHTTPMetricsExporter {
-    OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
 
-    private Meter meter;
+    private final SummaryMetrics summaryMetrics;
 
-    private SummaryMetrics summaryMetrics;
-
-    private HTTPMetricsServer httpMetricsServer;
+    private final HTTPMetricsServer httpMetricsServer;
 
     public OpenTelemetryHTTPMetricsExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
+        OpenTelemetryPrometheusExporter.initialize(eventMeshHTTPConfiguration);
         this.httpMetricsServer = httpMetricsServer;
-        summaryMetrics = httpMetricsServer.summaryMetrics;
-
-        // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
-        MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration);
-        meter = meterProvider.get("OpenTelemetryHTTPExporter", "0.13.1");
+        this.summaryMetrics = httpMetricsServer.summaryMetrics;
     }
 
     public void start(){
-        if (meter==null){
-            return;
-        }
+        Meter meter = GlobalMeterProvider.getMeter("apache-eventmesh");
         //maxHTTPTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.max")
-                .setDescription("max TPS of HTTP")
+                .doubleValueObserverBuilder("eventmesh.http.request.tps.max")
+                .setDescription("max TPS of HTTP.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty()))
                 .build();
 
         //avgHTTPTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.avg")
-                .setDescription("avg TPS of HTTP")
+                .doubleValueObserverBuilder("eventmesh.http.request.tps.avg")
+                .setDescription("avg TPS of HTTP.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgHTTPTPS(),Labels.empty()))
                 .build();
 
         //maxHTTPCost
         meter
-                .longValueObserverBuilder("eventmesh.http.request.elapsed.max")
-                .setDescription("max cost of HTTP")
+                .longValueObserverBuilder("eventmesh.http.request.cost.max")
+                .setDescription("max cost of HTTP.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxHTTPCost(), Labels.empty()))
                 .build();
 
         //avgHTTPCost
         meter
-                .doubleValueObserverBuilder("eventmesh.http.request.elapsed.avg")
-                .setDescription("avg cost of HTTP")
+                .doubleValueObserverBuilder("eventmesh.http.request.cost.avg")
+                .setDescription("avg cost of HTTP.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty()))
                 .build();
 
         //avgHTTPBodyDecodeCost
         meter
-                .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.elapsed.avg")
-                .setDescription("avg body decode cost of HTTP")
+                .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.avg")
+                .setDescription("avg body decode cost of HTTP.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgHTTPBodyDecodeCost(), Labels.empty()))
                 .build();
 
         //httpDiscard
         meter
-                .longValueObserverBuilder("eventmesh.http.request.discard.elapsed")
-                .setDescription("http request discard")
+                .longValueObserverBuilder("eventmesh.http.request.discard.num")
+                .setDescription("http request discard num.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getHttpDiscard(), Labels.empty()))
                 .build();
 
         //maxBatchSendMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.max")
-                .setDescription("max of batch send message tps")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.max")
+                .setDescription("max of batch send message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxSendBatchMsgTPS(), Labels.empty()))
                 .build();
 
         //avgBatchSendMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.avg")
-                .setDescription("avg of batch send message tps")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.avg")
+                .setDescription("avg of batch send message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgSendBatchMsgTPS(), Labels.empty()))
                 .build();
 
         //sum
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.elapsed.sum")
-                .setDescription("sum of batch send message number")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.num")
+                .setDescription("sum of batch send message number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgNumSum(), Labels.empty()))
                 .build();
 
         //sumFail
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.elapsed.sum")
-                .setDescription("sum of batch send message fail message number")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.num")
+                .setDescription("sum of batch send message fail message number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailNumSum(), Labels.empty()))
                 .build();
 
         //sumFailRate
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate.elapsed")
-                .setDescription("send batch message fail rate")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate")
+                .setDescription("send batch message fail rate.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailRate(), Labels.empty()))
                 .build();
 
         //discard
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.elapsed.sum")
-                .setDescription("sum of send batch message discard number")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.num")
+                .setDescription("sum of send batch message discard number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgDiscardNumSum(), Labels.empty()))
                 .build();
 
         //maxSendMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.max")
-                .setDescription("max of send message tps")
+                .doubleValueObserverBuilder("eventmesh.send.message.tps.max")
+                .setDescription("max of send message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxSendMsgTPS(), Labels.empty()))
                 .build();
 
         //avgSendMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.avg")
-                .setDescription("avg of send message tps")
+                .doubleValueObserverBuilder("eventmesh.send.message.tps.avg")
+                .setDescription("avg of send message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgTPS(), Labels.empty()))
                 .build();
 
         //sum
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.elapsed.sum")
-                .setDescription("sum of send message number")
+                .doubleValueObserverBuilder("eventmesh.send.message.num")
+                .setDescription("sum of send message number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendMsgNumSum(), Labels.empty()))
                 .build();
 
         //sumFail
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.fail.elapsed.sum")
-                .setDescription("sum of send message fail number")
+                .doubleValueObserverBuilder("eventmesh.send.message.fail.num")
+                .setDescription("sum of send message fail number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailNumSum(), Labels.empty()))
                 .build();
 
         //sumFailRate
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.fail.rate.elapsed")
-                .setDescription("send message fail rate")
+                .doubleValueObserverBuilder("eventmesh.send.message.fail.rate")
+                .setDescription("send message fail rate.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailRate(), Labels.empty()))
                 .build();
 
         //replyMsg
         meter
-                .doubleValueObserverBuilder("eventmesh.reply.message.elapsed.sum")
-                .setDescription("sum of reply message number")
+                .doubleValueObserverBuilder("eventmesh.reply.message.num")
+                .setDescription("sum of reply message number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgNumSum(), Labels.empty()))
                 .build();
 
         //replyFail
         meter
-                .doubleValueObserverBuilder("eventmesh.reply.message.fail.elapsed.sum")
-                .setDescription("sum of reply message fail number")
+                .doubleValueObserverBuilder("eventmesh.reply.message.fail.num")
+                .setDescription("sum of reply message fail number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgFailNumSum(), Labels.empty()))
                 .build();
 
         //maxPushMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.max")
-                .setDescription("max of push message tps")
+                .doubleValueObserverBuilder("eventmesh.push.message.tps.max")
+                .setDescription("max of push message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxPushMsgTPS(), Labels.empty()))
                 .build();
 
         //avgPushMsgTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.avg")
-                .setDescription("avg of push message tps")
+                .doubleValueObserverBuilder("eventmesh.push.message.tps.avg")
+                .setDescription("avg of push message tps.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgPushMsgTPS(), Labels.empty()))
                 .build();
 
         //sum
         meter
-                .doubleValueObserverBuilder("eventmesh.http.push.message.elapsed.sum")
-                .setDescription("sum of http push message number")
+                .doubleValueObserverBuilder("eventmesh.http.push.message.num")
+                .setDescription("sum of http push message number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgNumSum(), Labels.empty()))
                 .build();
 
         //sumFail
         meter
-                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.elapsed.sum")
-                .setDescription("sum of http push message fail number")
+                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.num")
+                .setDescription("sum of http push message fail number.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getHttpPushFailNumSum(), Labels.empty()))
                 .build();
 
         //sumFailRate
         meter
-                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate.elapsed")
-                .setDescription("http push message fail rate")
+                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate")
+                .setDescription("http push message fail rate.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgFailRate(), Labels.empty()))
                 .build();
 
         //maxClientLatency
         meter
-                .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.max")
-                .setDescription("max of http push latency")
+                .doubleValueObserverBuilder("eventmesh.http.push.latency.max")
+                .setDescription("max of http push latency.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.maxHTTPPushLatency(), Labels.empty()))
                 .build();
 
         //avgClientLatency
         meter
-                .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.avg")
-                .setDescription("avg of http push latency")
+                .doubleValueObserverBuilder("eventmesh.http.push.latency.avg")
+                .setDescription("avg of http push latency.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgHTTPPushLatency(), Labels.empty()))
                 .build();
 
         //batchMsgQ
         meter
-                .longValueObserverBuilder("eventmesh.batch.message.queue.elapsed.size")
-                .setDescription("size of batch message queue")
+                .longValueObserverBuilder("eventmesh.batch.message.queue.size")
+                .setDescription("size of batch message queue.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(httpMetricsServer.getBatchMsgQ(), Labels.empty()))
                 .build();
 
         //sendMsgQ
         meter
-                .longValueObserverBuilder("eventmesh.send.message.queue.elapsed.size")
-                .setDescription("size of send message queue")
+                .longValueObserverBuilder("eventmesh.send.message.queue.size")
+                .setDescription("size of send message queue.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(httpMetricsServer.getSendMsgQ(), Labels.empty()))
                 .build();
 
         //pushMsgQ
         meter
-                .longValueObserverBuilder("eventmesh.push.message.queue.elapsed.size")
-                .setDescription("size of push message queue")
+                .longValueObserverBuilder("eventmesh.push.message.queue.size")
+                .setDescription("size of push message queue.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(httpMetricsServer.getPushMsgQ(), Labels.empty()))
                 .build();
 
         //httpRetryQ
         meter
-                .longValueObserverBuilder("eventmesh.http.retry.queue.elapsed.size")
-                .setDescription("size of http retry queue")
+                .longValueObserverBuilder("eventmesh.http.retry.queue.size")
+                .setDescription("size of http retry queue.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(httpMetricsServer.getHttpRetryQ(), Labels.empty()))
                 .build();
 
         //batchAvgSend2MQCost
         meter
-                .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.elapsed.avg")
-                .setDescription("avg of batch send message cost")
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.avg")
+                .setDescription("avg of batch send message cost.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgBatchSendMsgCost(), Labels.empty()))
                 .build();
 
         //avgSend2MQCost
         meter
-                .doubleValueObserverBuilder("eventmesh.send.message.cost.elapsed.avg")
-                .setDescription("avg of send message cost")
+                .doubleValueObserverBuilder("eventmesh.send.message.cost.avg")
+                .setDescription("avg of send message cost.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgCost(), Labels.empty()))
                 .build();
 
         //avgReply2MQCost
         meter
-                .doubleValueObserverBuilder("eventmesh.reply.message.cost.elapsed.avg")
-                .setDescription("avg of reply message cost")
+                .doubleValueObserverBuilder("eventmesh.reply.message.cost.avg")
+                .setDescription("avg of reply message cost.")
                 .setUnit("HTTP")
                 .setUpdater(result -> result.observe(summaryMetrics.avgReplyMsgCost(), Labels.empty()))
                 .build();
     }
 
     public void shutdown(){
-        configuration.shutdownPrometheusEndpoint();
+        OpenTelemetryPrometheusExporter.shutdown();
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java
new file mode 100644
index 0000000..730adc9
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
+
+import io.opentelemetry.exporter.prometheus.PrometheusCollector;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.prometheus.client.exporter.HTTPServer;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+public class OpenTelemetryPrometheusExporter {
+
+    private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryPrometheusExporter.class);
+
+    private static HTTPServer server;
+
+    /**
+     * Initializes the Meter SDK and configures the prometheus collector with all default settings.
+     *
+     * @param configuration configuration.
+     */
+    public synchronized static void initialize(CommonConfiguration configuration) {
+        if (server != null){
+            return;
+        }
+        PrometheusCollector.builder().setMetricProducer(
+                SdkMeterProvider.builder().buildAndRegisterGlobal()).buildAndRegister();
+        int port = configuration.eventMeshPrometheusPort;
+        try {
+            //Use the daemon thread to start an HTTP server to serve the default Prometheus registry.
+            server = new HTTPServer(port,true);
+        } catch (IOException e) {
+            logger.error("failed to start prometheus server, port: {} due to {}", port, e.getMessage());
+        }
+    }
+
+    public static void shutdown() {
+        if (server != null) {
+            server.stop();
+            server = null;
+        }
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
similarity index 71%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
index b49cef9..c758962 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
@@ -15,71 +15,62 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
 
+import io.opentelemetry.api.metrics.GlobalMeterProvider;
 import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.api.metrics.MeterProvider;
 import io.opentelemetry.api.metrics.common.Labels;
-import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 
 public class OpenTelemetryTCPMetricsExporter {
-    OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
 
-    private Meter meter;
-
-    private EventMeshTcpMonitor eventMeshTcpMonitor;
+    private final EventMeshTcpMonitor eventMeshTcpMonitor;
 
     public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor , EventMeshTCPConfiguration eventMeshTCPConfiguration){
+        OpenTelemetryPrometheusExporter.initialize(eventMeshTCPConfiguration);
         this.eventMeshTcpMonitor = eventMeshTcpMonitor;
-
-        // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
-        MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshTCPConfiguration);
-        meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1");
     }
 
     public void start(){
-        if (meter==null){
-            return;
-        }
+        Meter meter = GlobalMeterProvider.getMeter("apache-eventmesh");
         //retryQueueSize
         meter
                 .doubleValueObserverBuilder("eventmesh.tcp.retry.queue.size")
-                .setDescription("get size of retry queue")
+                .setDescription("get size of retry queue.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMeshTCPServer().getEventMeshTcpRetryer().getRetrySize(), Labels.empty()))
                 .build();
 
         //client2eventMeshTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.tcp.client2.tps")
-                .setDescription("get tps of client to eventMesh")
+                .doubleValueObserverBuilder("eventmesh.tcp.server.tps")
+                .setDescription("get tps of client to eventMesh.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getClient2eventMeshTPS(), Labels.empty()))
                 .build();
 
         //eventMesh2mqTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.tcp.2mq.tps")
-                .setDescription("get tps of eventMesh to mq")
+                .doubleValueObserverBuilder("eventmesh.tcp.mq.provider.tps")
+                .setDescription("get tps of eventMesh to mq.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2mqTPS(), Labels.empty()))
                 .build();
 
         //mq2eventMeshTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.tcp.mq2.tps")
-                .setDescription("get tps of mq to eventMesh")
+                .doubleValueObserverBuilder("eventmesh.tcp.mq.consumer.tps")
+                .setDescription("get tps of mq to eventMesh.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getMq2eventMeshTPS(), Labels.empty()))
                 .build();
 
         //eventMesh2clientTPS
         meter
-                .doubleValueObserverBuilder("eventmesh.tcp.2client.tps")
-                .setDescription("get tps of eventMesh to client")
+                .doubleValueObserverBuilder("eventmesh.tcp.client.tps")
+                .setDescription("get tps of eventMesh to client.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2clientTPS(), Labels.empty()))
                 .build();
@@ -87,15 +78,15 @@ public class OpenTelemetryTCPMetricsExporter {
         //allTPS
         meter
                 .doubleValueObserverBuilder("eventmesh.tcp.all.tps")
-                .setDescription("get all TPS")
+                .setDescription("get all TPS.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getAllTPS(), Labels.empty()))
                 .build();
 
         //EventMeshTcpConnectionHandler.connections
         meter
-                .doubleValueObserverBuilder("eventmesh.tcp.connection.handler.connections")
-                .setDescription("EventMeshTcpConnectionHandler.connections")
+                .doubleValueObserverBuilder("eventmesh.tcp.connection.num")
+                .setDescription("EventMeshTcpConnectionHandler.connections.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(EventMeshTcpConnectionHandler.connections.doubleValue(), Labels.empty()))
                 .build();
@@ -103,12 +94,13 @@ public class OpenTelemetryTCPMetricsExporter {
         //subTopicNum
         meter
                 .doubleValueObserverBuilder("eventmesh.tcp.sub.topic.num")
-                .setDescription("get sub topic num")
+                .setDescription("get sub topic num.")
                 .setUnit("TCP")
                 .setUpdater(result -> result.observe(eventMeshTcpMonitor.getSubTopicNum(), Labels.empty()))
                 .build();
     }
+
     public void shutdown(){
-        configuration.shutdownPrometheusEndpoint();
+        OpenTelemetryPrometheusExporter.shutdown();
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 99b0f05..2a2cbdc 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -32,7 +32,7 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants;
-import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryTCPMetricsExporter;
+import org.apache.eventmesh.runtime.metrics.opentelemetry.OpenTelemetryTCPMetricsExporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org