You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/08/10 10:13:49 UTC

[incubator-eventmesh] branch opentelemetry updated: [ISSUE #340]Integrate With OpenTelemetry for metrics in EventMesh (#467)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch opentelemetry
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/opentelemetry by this push:
     new f0d8b55  [ISSUE #340]Integrate With OpenTelemetry for metrics in EventMesh (#467)
f0d8b55 is described below

commit f0d8b553cea19968af94bf737a4c5a76cfe40dfa
Author: ZePeng Chen <84...@users.noreply.github.com>
AuthorDate: Tue Aug 10 18:13:44 2021 +0800

    [ISSUE #340]Integrate With OpenTelemetry for metrics in EventMesh (#467)
    
    * The Chinese in the notes is translated into English.
    
    * Translation improvement
    
    * Translation improvement
    
    * fix Chinese annotation on runtime module
    
    * Export metrics data with open telemetry and use Prometheus for visual observation
    
    * Export metrics data with open telemetry and use Prometheus for visual observation
    
    * Export metrics data with open telemetry and use Prometheus for visual observation
    
    * Export metrics data with open telemetry and use Prometheus for visual observation
    
    * Improper modification
    
    * Improper modification
    
    * improve
    
    * improve
    
    * improve
    
    * tcp metrics export
    
    * improve
    
    * improve
---
 .../en/features/eventmesh-metrics-export-design.md |  47 +++
 .../common/config/CommonConfiguration.java         |   8 +
 eventmesh-runtime/build.gradle                     |  14 +-
 eventmesh-runtime/conf/eventmesh.properties        |   5 +-
 eventmesh-runtime/conf/prometheus.yml              |  46 +++
 .../configuration/EventMeshHTTPConfiguration.java  |   1 +
 .../runtime/metrics/http/HTTPMetricsServer.java    |  23 ++
 .../OpenTelemetryExporterConfiguration.java        |  68 +++++
 .../OpenTelemetryHTTPMetricsExporter.java          | 317 +++++++++++++++++++++
 .../OpenTelemetryTCPMetricsExporter.java           | 114 ++++++++
 .../runtime/metrics/tcp/EventMeshTcpMonitor.java   |  34 +++
 11 files changed, 671 insertions(+), 6 deletions(-)

diff --git a/docs/en/features/eventmesh-metrics-export-design.md b/docs/en/features/eventmesh-metrics-export-design.md
new file mode 100644
index 0000000..a8f5958
--- /dev/null
+++ b/docs/en/features/eventmesh-metrics-export-design.md
@@ -0,0 +1,47 @@
+# EventMesh Metrics (OpenTelemetry+Prometheus)
+
+## Introduction
+
+[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic cloud-native eventing infrastructure.
+
+## An overview of OpenTelemetry
+
+OpenTelemetry is a collection of tools, APIs, and SDKs. You can use it to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) for analysis in order to understand your software's performance and behavior.
+
+## An overview of  Prometheus
+
+Power your metrics and alerting with a leading open-source monitoring solution.
+
+- Dimensional data
+- Powerful queries
+- Great visualization
+- Efficient storage
+- Simple operation 
+- Precise alerting
+- Many client libraries
+- Many integrations
+
+## Requirements
+
+### Functional Requirements
+
+| Requirement ID | Requirement Description                                      | Comments      |
+| :------------- | ------------------------------------------------------------ | ------------- |
+| F-1            | EventMesh users should be able to observe HTTP metrics from Prometheus | Functionality |
+| F-2            | EventMesh users should be able to observe TCP metrics from Prometheus | Functionality |
+
+## Design Details
+
+use the meter instrument provided by OpenTelemetry to observe the metrics exist in EventMesh then export to Prometheus.
+
+1、Initialize a meter instrument
+
+2、set the Prometheus server
+
+3、different metrics observer built
+
+## Appendix
+
+#### References
+
+https://github.com/open-telemetry/docs-cn/blob/main/QUICKSTART.md#%E5%88%9B%E5%BB%BA%E5%9F%BA%E7%A1%80Span
\ No newline at end of file
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 321ebe0..42a5db1 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -29,6 +29,7 @@ public class CommonConfiguration {
     public String eventMeshName = "";
     public String sysID = "5477";
     public String eventMeshConnectorPluginType = "rocketmq";
+    public int eventMeshPrometheusPort = 19090;
 
     public String namesrvAddr = "";
     public String clientUserName = "username";
@@ -75,6 +76,11 @@ public class CommonConfiguration {
             Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIDCStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC));
             eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIDCStr);
 
+            String eventMeshPrometheusPortStr = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_METRICS_PROMETHEUS_PORT);
+            if (StringUtils.isNotEmpty(eventMeshPrometheusPortStr)) {
+                eventMeshPrometheusPort = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshPrometheusPortStr));
+            }
+
             eventMeshServerIp = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
             if (StringUtils.isBlank(eventMeshServerIp)) {
                 eventMeshServerIp = IPUtil.getLocalAddress();
@@ -103,5 +109,7 @@ public class CommonConfiguration {
         public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
 
         public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
+
+        public static String KEY_EVENTMESH_METRICS_PROMETHEUS_PORT = "eventMesh.metrics.prometheus.port";
     }
 }
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 00779a7..a74514a 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -23,14 +23,20 @@ List metrics = [
         "io.dropwizard.metrics:metrics-json:4.1.0"
 ]
 
-
+List open_telemetry = [
+        "io.opentelemetry:opentelemetry-api:1.3.0",
+        "io.opentelemetry:opentelemetry-sdk:1.3.0",
+        "io.opentelemetry:opentelemetry-sdk-metrics:1.3.0-alpha",
+        "io.opentelemetry:opentelemetry-exporter-prometheus:1.3.0-alpha",
+        "io.prometheus:simpleclient:0.8.1",
+        "io.prometheus:simpleclient_httpserver:0.8.1"
+]
 
 List open_message = [
         "io.openmessaging:openmessaging-api:2.2.1-pubsub"
 ]
 
-
 dependencies {
-    implementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
-    testImplementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    implementation metrics, open_telemetry, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation metrics, open_telemetry,open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
 }
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 45fc193..486fb71 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -52,6 +52,7 @@ eventMesh.server.registry.registerIntervalInMills=10000
 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
 #auto-ack
 #eventMesh.server.defibus.client.comsumeTimeoutInMin=5
-
 #connector plugin
-eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
+eventMesh.connector.plugin.type=rocketmq
+#prometheusPort
+eventMesh.metrics.prometheus.port=19090
\ No newline at end of file
diff --git a/eventmesh-runtime/conf/prometheus.yml b/eventmesh-runtime/conf/prometheus.yml
new file mode 100644
index 0000000..ea48021
--- /dev/null
+++ b/eventmesh-runtime/conf/prometheus.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+global:
+  scrape_interval: 15s
+  scrape_timeout: 10s
+  evaluation_interval: 15s
+alerting:
+  alertmanagers:
+  - static_configs:
+    - targets: []
+    scheme: http
+    timeout: 10s
+    api_version: v1
+scrape_configs:
+- job_name: prometheus
+  honor_timestamps: true
+  scrape_interval: 15s
+  scrape_timeout: 10s
+  metrics_path: /metrics
+  scheme: http
+  static_configs:
+  - targets:
+    - localhost:9090
+- job_name: EventMesh_HTTP_export_test
+  honor_timestamps: true
+  scrape_interval: 15s
+  scrape_timeout: 10s
+  metrics_path: /metrics
+  scheme: http
+  static_configs:
+  - targets:
+    - 127.0.0.1:19090
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index c5ec9cd..ffa6cbd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -226,5 +226,6 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
         public static String KEY_EVENTMESH_CONSUMER_ENABLED = "eventMesh.server.consumer.enabled";
 
         public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled";
+
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index 3e333a0..a71e42f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.codahale.metrics.MetricRegistry;
 
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryHTTPMetricsExporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +44,8 @@ public class HTTPMetricsServer {
 
     public GroupMetrics groupMetrics;
 
+    public OpenTelemetryHTTPMetricsExporter openTelemetryHTTPMetricsExporter;
+
     private Logger httpLogger = LoggerFactory.getLogger("httpMonitor");
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -56,10 +59,14 @@ public class HTTPMetricsServer {
         topicMetrics = new TopicMetrics(this.eventMeshHTTPServer, this.metricRegistry);
         groupMetrics = new GroupMetrics(this.eventMeshHTTPServer, this.metricRegistry);
         healthMetrics = new HealthMetrics(this.eventMeshHTTPServer, this.metricRegistry);
+
+        openTelemetryHTTPMetricsExporter = new OpenTelemetryHTTPMetricsExporter(this,this.eventMeshHTTPServer.getEventMeshHttpConfiguration());
+
         logger.info("HTTPMetricsServer inited......");
     }
 
     public void start() throws Exception {
+        openTelemetryHTTPMetricsExporter.start();
         metricsSchedule.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -90,6 +97,7 @@ public class HTTPMetricsServer {
 
     public void shutdown() throws Exception {
         metricsSchedule.shutdown();
+        openTelemetryHTTPMetricsExporter.shutdown();
         logger.info("HTTPMetricsServer shutdown......");
     }
 
@@ -162,6 +170,21 @@ public class HTTPMetricsServer {
         summaryMetrics.send2MQStatInfoClear();
     }
 
+    public int getBatchMsgQ(){
+        return eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size();
+    }
+
+    public int getSendMsgQ(){
+        return eventMeshHTTPServer.getSendMsgExecutor().getQueue().size();
+    }
+
+    public int getPushMsgQ(){
+        return eventMeshHTTPServer.getPushMsgExecutor().getQueue().size();
+    }
+
+    public int getHttpRetryQ(){
+        return eventMeshHTTPServer.getHttpRetryer().size();
+    }
 
     public HealthMetrics getHealthMetrics() {
         return healthMetrics;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
new file mode 100644
index 0000000..40752ea
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.openTelemetry;
+
+import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.exporter.prometheus.PrometheusCollector;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.prometheus.client.exporter.HTTPServer;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
+
+import java.io.IOException;
+
+//ues openTelemetry to export metrics data
+public class OpenTelemetryExporterConfiguration {
+
+    private static HTTPServer server;//Prometheus server
+
+    static int prometheusPort;//the endpoint to export metrics
+
+    static MeterProvider meterProvider;
+    /**
+     * Initializes the Meter SDK and configures the prometheus collector with all default settings.
+     *
+     * @return A MeterProvider for use in instrumentation.
+     */
+    public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) {
+        if (server!=null){//the sever already start
+            return meterProvider;
+        }
+
+        prometheusPort = configuration.eventMeshPrometheusPort;
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
+        PrometheusCollector.builder().setMetricProducer(sdkMeterProvider).buildAndRegister();
+        this.meterProvider = sdkMeterProvider;
+        try {
+            server = new HTTPServer(prometheusPort,true);//Use the daemon thread to start an HTTP server to serve the default Prometheus registry.
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return meterProvider;
+    }
+
+    public static MeterProvider getMeterProvider(){//for tcp or http to get the initialized meterProvider
+        return meterProvider;
+    }
+
+    public void shutdownPrometheusEndpoint() {
+        if (server==null)
+            return;
+        server.stop();
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
new file mode 100644
index 0000000..ff2a31e
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.openTelemetry;
+
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.api.metrics.common.Labels;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics;
+
+public class OpenTelemetryHTTPMetricsExporter {
+    OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
+
+    private Meter meter;
+
+    private SummaryMetrics summaryMetrics;
+
+    private HTTPMetricsServer httpMetricsServer;
+
+    public OpenTelemetryHTTPMetricsExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
+        this.httpMetricsServer = httpMetricsServer;
+        summaryMetrics = httpMetricsServer.summaryMetrics;
+
+        // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
+        MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration);
+        meter = meterProvider.get("OpenTelemetryHTTPExporter", "0.13.1");
+    }
+
+    public void start(){
+        if (meter==null){
+            return;
+        }
+        //maxHTTPTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.max")
+                .setDescription("max TPS of HTTP")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty()))
+                .build();
+
+        //avgHTTPTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.avg")
+                .setDescription("avg TPS of HTTP")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgHTTPTPS(),Labels.empty()))
+                .build();
+
+        //maxHTTPCost
+        meter
+                .longValueObserverBuilder("eventmesh.http.request.elapsed.max")
+                .setDescription("max cost of HTTP")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxHTTPCost(), Labels.empty()))
+                .build();
+
+        //avgHTTPCost
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.request.elapsed.avg")
+                .setDescription("avg cost of HTTP")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty()))
+                .build();
+
+        //avgHTTPBodyDecodeCost
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.elapsed.avg")
+                .setDescription("avg body decode cost of HTTP")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgHTTPBodyDecodeCost(), Labels.empty()))
+                .build();
+
+        //httpDiscard
+        meter
+                .longValueObserverBuilder("eventmesh.http.request.discard.elapsed")
+                .setDescription("http request discard")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getHttpDiscard(), Labels.empty()))
+                .build();
+
+        //maxBatchSendMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.max")
+                .setDescription("max of batch send message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxSendBatchMsgTPS(), Labels.empty()))
+                .build();
+
+        //avgBatchSendMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.avg")
+                .setDescription("avg of batch send message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgSendBatchMsgTPS(), Labels.empty()))
+                .build();
+
+        //sum
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.elapsed.sum")
+                .setDescription("sum of batch send message number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgNumSum(), Labels.empty()))
+                .build();
+
+        //sumFail
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.elapsed.sum")
+                .setDescription("sum of batch send message fail message number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailNumSum(), Labels.empty()))
+                .build();
+
+        //sumFailRate
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate.elapsed")
+                .setDescription("send batch message fail rate")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailRate(), Labels.empty()))
+                .build();
+
+        //discard
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.elapsed.sum")
+                .setDescription("sum of send batch message discard number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgDiscardNumSum(), Labels.empty()))
+                .build();
+
+        //maxSendMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.max")
+                .setDescription("max of send message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxSendMsgTPS(), Labels.empty()))
+                .build();
+
+        //avgSendMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.avg")
+                .setDescription("avg of send message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgTPS(), Labels.empty()))
+                .build();
+
+        //sum
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.elapsed.sum")
+                .setDescription("sum of send message number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendMsgNumSum(), Labels.empty()))
+                .build();
+
+        //sumFail
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.fail.elapsed.sum")
+                .setDescription("sum of send message fail number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailNumSum(), Labels.empty()))
+                .build();
+
+        //sumFailRate
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.fail.rate.elapsed")
+                .setDescription("send message fail rate")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailRate(), Labels.empty()))
+                .build();
+
+        //replyMsg
+        meter
+                .doubleValueObserverBuilder("eventmesh.reply.message.elapsed.sum")
+                .setDescription("sum of reply message number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgNumSum(), Labels.empty()))
+                .build();
+
+        //replyFail
+        meter
+                .doubleValueObserverBuilder("eventmesh.reply.message.fail.elapsed.sum")
+                .setDescription("sum of reply message fail number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getReplyMsgFailNumSum(), Labels.empty()))
+                .build();
+
+        //maxPushMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.max")
+                .setDescription("max of push message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxPushMsgTPS(), Labels.empty()))
+                .build();
+
+        //avgPushMsgTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.avg")
+                .setDescription("avg of push message tps")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgPushMsgTPS(), Labels.empty()))
+                .build();
+
+        //sum
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.push.message.elapsed.sum")
+                .setDescription("sum of http push message number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgNumSum(), Labels.empty()))
+                .build();
+
+        //sumFail
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.elapsed.sum")
+                .setDescription("sum of http push message fail number")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getHttpPushFailNumSum(), Labels.empty()))
+                .build();
+
+        //sumFailRate
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate.elapsed")
+                .setDescription("http push message fail rate")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgFailRate(), Labels.empty()))
+                .build();
+
+        //maxClientLatency
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.max")
+                .setDescription("max of http push latency")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.maxHTTPPushLatency(), Labels.empty()))
+                .build();
+
+        //avgClientLatency
+        meter
+                .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.avg")
+                .setDescription("avg of http push latency")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgHTTPPushLatency(), Labels.empty()))
+                .build();
+
+        //batchMsgQ
+        meter
+                .longValueObserverBuilder("eventmesh.batch.message.queue.elapsed.size")
+                .setDescription("size of batch message queue")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(httpMetricsServer.getBatchMsgQ(), Labels.empty()))
+                .build();
+
+        //sendMsgQ
+        meter
+                .longValueObserverBuilder("eventmesh.send.message.queue.elapsed.size")
+                .setDescription("size of send message queue")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(httpMetricsServer.getSendMsgQ(), Labels.empty()))
+                .build();
+
+        //pushMsgQ
+        meter
+                .longValueObserverBuilder("eventmesh.push.message.queue.elapsed.size")
+                .setDescription("size of push message queue")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(httpMetricsServer.getPushMsgQ(), Labels.empty()))
+                .build();
+
+        //httpRetryQ
+        meter
+                .longValueObserverBuilder("eventmesh.http.retry.queue.elapsed.size")
+                .setDescription("size of http retry queue")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(httpMetricsServer.getHttpRetryQ(), Labels.empty()))
+                .build();
+
+        //batchAvgSend2MQCost
+        meter
+                .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.elapsed.avg")
+                .setDescription("avg of batch send message cost")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgBatchSendMsgCost(), Labels.empty()))
+                .build();
+
+        //avgSend2MQCost
+        meter
+                .doubleValueObserverBuilder("eventmesh.send.message.cost.elapsed.avg")
+                .setDescription("avg of send message cost")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgSendMsgCost(), Labels.empty()))
+                .build();
+
+        //avgReply2MQCost
+        meter
+                .doubleValueObserverBuilder("eventmesh.reply.message.cost.elapsed.avg")
+                .setDescription("avg of reply message cost")
+                .setUnit("HTTP")
+                .setUpdater(result -> result.observe(summaryMetrics.avgReplyMsgCost(), Labels.empty()))
+                .build();
+    }
+
+    public void shutdown(){
+        configuration.shutdownPrometheusEndpoint();
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
new file mode 100644
index 0000000..b49cef9
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.openTelemetry;
+
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.api.metrics.common.Labels;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
+import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
+
+public class OpenTelemetryTCPMetricsExporter {
+    OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
+
+    private Meter meter;
+
+    private EventMeshTcpMonitor eventMeshTcpMonitor;
+
+    public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor , EventMeshTCPConfiguration eventMeshTCPConfiguration){
+        this.eventMeshTcpMonitor = eventMeshTcpMonitor;
+
+        // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
+        MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshTCPConfiguration);
+        meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1");
+    }
+
+    public void start(){
+        if (meter==null){
+            return;
+        }
+        //retryQueueSize
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.retry.queue.size")
+                .setDescription("get size of retry queue")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMeshTCPServer().getEventMeshTcpRetryer().getRetrySize(), Labels.empty()))
+                .build();
+
+        //client2eventMeshTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.client2.tps")
+                .setDescription("get tps of client to eventMesh")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getClient2eventMeshTPS(), Labels.empty()))
+                .build();
+
+        //eventMesh2mqTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.2mq.tps")
+                .setDescription("get tps of eventMesh to mq")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2mqTPS(), Labels.empty()))
+                .build();
+
+        //mq2eventMeshTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.mq2.tps")
+                .setDescription("get tps of mq to eventMesh")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getMq2eventMeshTPS(), Labels.empty()))
+                .build();
+
+        //eventMesh2clientTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.2client.tps")
+                .setDescription("get tps of eventMesh to client")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2clientTPS(), Labels.empty()))
+                .build();
+
+        //allTPS
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.all.tps")
+                .setDescription("get all TPS")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getAllTPS(), Labels.empty()))
+                .build();
+
+        //EventMeshTcpConnectionHandler.connections
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.connection.handler.connections")
+                .setDescription("EventMeshTcpConnectionHandler.connections")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(EventMeshTcpConnectionHandler.connections.doubleValue(), Labels.empty()))
+                .build();
+
+        //subTopicNum
+        meter
+                .doubleValueObserverBuilder("eventmesh.tcp.sub.topic.num")
+                .setDescription("get sub topic num")
+                .setUnit("TCP")
+                .setUpdater(result -> result.observe(eventMeshTcpMonitor.getSubTopicNum(), Labels.empty()))
+                .build();
+    }
+    public void shutdown(){
+        configuration.shutdownPrometheusEndpoint();
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 7120218..05a70b8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -32,6 +32,7 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants;
+import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryTCPMetricsExporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,10 @@ public class EventMeshTcpMonitor {
 
     private EventMeshTCPServer eventMeshTCPServer;
 
+    public EventMeshTCPServer getEventMeshTCPServer() {
+        return eventMeshTCPServer;
+    }
+
     private final Logger tcpLogger = LoggerFactory.getLogger("tcpMonitor");
 
     private final Logger appLogger = LoggerFactory.getLogger("appMonitor");
@@ -63,6 +68,8 @@ public class EventMeshTcpMonitor {
     private int allTPS;
     private int subTopicNum;
 
+    private OpenTelemetryTCPMetricsExporter metricsExporter;
+
     public ScheduledFuture<?> monitorTpsTask;
 
     public ScheduledFuture<?> monitorThreadPoolTask;
@@ -76,10 +83,12 @@ public class EventMeshTcpMonitor {
         this.eventMesh2mqMsgNum = new AtomicInteger(0);
         this.mq2eventMeshMsgNum = new AtomicInteger(0);
         this.eventMesh2clientMsgNum = new AtomicInteger(0);
+        this.metricsExporter = new OpenTelemetryTCPMetricsExporter(this,eventMeshTCPServer.getEventMeshTCPConfiguration());
         logger.info("EventMeshTcpMonitor inited......");
     }
 
     public void start() throws Exception {
+        metricsExporter.start();
         monitorTpsTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate((new Runnable() {
             @Override
             public void run() {
@@ -148,6 +157,7 @@ public class EventMeshTcpMonitor {
     public void shutdown() throws Exception {
         monitorTpsTask.cancel(true);
         monitorThreadPoolTask.cancel(true);
+        metricsExporter.shutdown();
         logger.info("EventMeshTcpMonitor shutdown......");
     }
 
@@ -166,4 +176,28 @@ public class EventMeshTcpMonitor {
     public AtomicInteger getEventMesh2clientMsgNum() {
         return eventMesh2clientMsgNum;
     }
+
+    public int getClient2eventMeshTPS() {
+        return client2eventMeshTPS;
+    }
+
+    public int getEventMesh2clientTPS() {
+        return eventMesh2clientTPS;
+    }
+
+    public int getEventMesh2mqTPS() {
+        return eventMesh2mqTPS;
+    }
+
+    public int getMq2eventMeshTPS() {
+        return mq2eventMeshTPS;
+    }
+
+    public int getAllTPS() {
+        return allTPS;
+    }
+
+    public int getSubTopicNum() {
+        return subTopicNum;
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org