You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/08/24 08:38:37 UTC
[incubator-eventmesh] branch develop updated: otel metric polish
(#503)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new d46b8bd otel metric polish (#503)
d46b8bd is described below
commit d46b8bd3f4f4637aaa6af7def4b5441e65f319cf
Author: YuDong Tang <58...@qq.com>
AuthorDate: Tue Aug 24 16:38:22 2021 +0800
otel metric polish (#503)
---
.../runtime/metrics/http/HTTPMetricsServer.java | 2 +-
.../OpenTelemetryExporterConfiguration.java | 68 ---------
.../OpenTelemetryHTTPMetricsExporter.java | 156 ++++++++++-----------
.../OpenTelemetryPrometheusExporter.java | 60 ++++++++
.../OpenTelemetryTCPMetricsExporter.java | 48 +++----
.../runtime/metrics/tcp/EventMeshTcpMonitor.java | 2 +-
6 files changed, 156 insertions(+), 180 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index a71e42f..d49167f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.MetricRegistry;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryHTTPMetricsExporter;
+import org.apache.eventmesh.runtime.metrics.opentelemetry.OpenTelemetryHTTPMetricsExporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
deleted file mode 100644
index 40752ea..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryExporterConfiguration.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
-
-import io.opentelemetry.api.metrics.MeterProvider;
-import io.opentelemetry.exporter.prometheus.PrometheusCollector;
-import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import io.prometheus.client.exporter.HTTPServer;
-import org.apache.eventmesh.common.config.CommonConfiguration;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-
-import java.io.IOException;
-
-//ues openTelemetry to export metrics data
-public class OpenTelemetryExporterConfiguration {
-
- private static HTTPServer server;//Prometheus server
-
- static int prometheusPort;//the endpoint to export metrics
-
- static MeterProvider meterProvider;
- /**
- * Initializes the Meter SDK and configures the prometheus collector with all default settings.
- *
- * @return A MeterProvider for use in instrumentation.
- */
- public MeterProvider initializeOpenTelemetry(CommonConfiguration configuration) {
- if (server!=null){//the sever already start
- return meterProvider;
- }
-
- prometheusPort = configuration.eventMeshPrometheusPort;
- SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
- PrometheusCollector.builder().setMetricProducer(sdkMeterProvider).buildAndRegister();
- this.meterProvider = sdkMeterProvider;
- try {
- server = new HTTPServer(prometheusPort,true);//Use the daemon thread to start an HTTP server to serve the default Prometheus registry.
- } catch (IOException e) {
- e.printStackTrace();
- }
- return meterProvider;
- }
-
- public static MeterProvider getMeterProvider(){//for tcp or http to get the initialized meterProvider
- return meterProvider;
- }
-
- public void shutdownPrometheusEndpoint() {
- if (server==null)
- return;
- server.stop();
- }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
similarity index 77%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
index ff2a31e..a94b7e1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryHTTPMetricsExporter.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryHTTPMetricsExporter.java
@@ -15,303 +15,295 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
+import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.common.Labels;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.metrics.http.SummaryMetrics;
public class OpenTelemetryHTTPMetricsExporter {
- OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
- private Meter meter;
+ private final SummaryMetrics summaryMetrics;
- private SummaryMetrics summaryMetrics;
-
- private HTTPMetricsServer httpMetricsServer;
+ private final HTTPMetricsServer httpMetricsServer;
public OpenTelemetryHTTPMetricsExporter(HTTPMetricsServer httpMetricsServer, EventMeshHTTPConfiguration eventMeshHTTPConfiguration) {
+ OpenTelemetryPrometheusExporter.initialize(eventMeshHTTPConfiguration);
this.httpMetricsServer = httpMetricsServer;
- summaryMetrics = httpMetricsServer.summaryMetrics;
-
- // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
- MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshHTTPConfiguration);
- meter = meterProvider.get("OpenTelemetryHTTPExporter", "0.13.1");
+ this.summaryMetrics = httpMetricsServer.summaryMetrics;
}
public void start(){
- if (meter==null){
- return;
- }
+ Meter meter = GlobalMeterProvider.getMeter("apache-eventmesh");
//maxHTTPTPS
meter
- .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.max")
- .setDescription("max TPS of HTTP")
+ .doubleValueObserverBuilder("eventmesh.http.request.tps.max")
+ .setDescription("max TPS of HTTP.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxHTTPTPS(),Labels.empty()))
.build();
//avgHTTPTPS
meter
- .doubleValueObserverBuilder("eventmesh.http.request.tps.elapsed.avg")
- .setDescription("avg TPS of HTTP")
+ .doubleValueObserverBuilder("eventmesh.http.request.tps.avg")
+ .setDescription("avg TPS of HTTP.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgHTTPTPS(),Labels.empty()))
.build();
//maxHTTPCost
meter
- .longValueObserverBuilder("eventmesh.http.request.elapsed.max")
- .setDescription("max cost of HTTP")
+ .longValueObserverBuilder("eventmesh.http.request.cost.max")
+ .setDescription("max cost of HTTP.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxHTTPCost(), Labels.empty()))
.build();
//avgHTTPCost
meter
- .doubleValueObserverBuilder("eventmesh.http.request.elapsed.avg")
- .setDescription("avg cost of HTTP")
+ .doubleValueObserverBuilder("eventmesh.http.request.cost.avg")
+ .setDescription("avg cost of HTTP.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgHTTPCost(), Labels.empty()))
.build();
//avgHTTPBodyDecodeCost
meter
- .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.elapsed.avg")
- .setDescription("avg body decode cost of HTTP")
+ .doubleValueObserverBuilder("eventmesh.http.body.decode.cost.avg")
+ .setDescription("avg body decode cost of HTTP.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgHTTPBodyDecodeCost(), Labels.empty()))
.build();
//httpDiscard
meter
- .longValueObserverBuilder("eventmesh.http.request.discard.elapsed")
- .setDescription("http request discard")
+ .longValueObserverBuilder("eventmesh.http.request.discard.num")
+ .setDescription("http request discard num.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getHttpDiscard(), Labels.empty()))
.build();
//maxBatchSendMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.max")
- .setDescription("max of batch send message tps")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.max")
+ .setDescription("max of batch send message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxSendBatchMsgTPS(), Labels.empty()))
.build();
//avgBatchSendMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.elapsed.avg")
- .setDescription("avg of batch send message tps")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.tps.avg")
+ .setDescription("avg of batch send message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgSendBatchMsgTPS(), Labels.empty()))
.build();
//sum
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.elapsed.sum")
- .setDescription("sum of batch send message number")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.num")
+ .setDescription("sum of batch send message number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgNumSum(), Labels.empty()))
.build();
//sumFail
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.elapsed.sum")
- .setDescription("sum of batch send message fail message number")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.num")
+ .setDescription("sum of batch send message fail message number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailNumSum(), Labels.empty()))
.build();
//sumFailRate
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate.elapsed")
- .setDescription("send batch message fail rate")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.fail.rate")
+ .setDescription("send batch message fail rate.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgFailRate(), Labels.empty()))
.build();
//discard
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.elapsed.sum")
- .setDescription("sum of send batch message discard number")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.discard.num")
+ .setDescription("sum of send batch message discard number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendBatchMsgDiscardNumSum(), Labels.empty()))
.build();
//maxSendMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.max")
- .setDescription("max of send message tps")
+ .doubleValueObserverBuilder("eventmesh.send.message.tps.max")
+ .setDescription("max of send message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxSendMsgTPS(), Labels.empty()))
.build();
//avgSendMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.send.message.tps.elapsed.avg")
- .setDescription("avg of send message tps")
+ .doubleValueObserverBuilder("eventmesh.send.message.tps.avg")
+ .setDescription("avg of send message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgSendMsgTPS(), Labels.empty()))
.build();
//sum
meter
- .doubleValueObserverBuilder("eventmesh.send.message.elapsed.sum")
- .setDescription("sum of send message number")
+ .doubleValueObserverBuilder("eventmesh.send.message.num")
+ .setDescription("sum of send message number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendMsgNumSum(), Labels.empty()))
.build();
//sumFail
meter
- .doubleValueObserverBuilder("eventmesh.send.message.fail.elapsed.sum")
- .setDescription("sum of send message fail number")
+ .doubleValueObserverBuilder("eventmesh.send.message.fail.num")
+ .setDescription("sum of send message fail number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailNumSum(), Labels.empty()))
.build();
//sumFailRate
meter
- .doubleValueObserverBuilder("eventmesh.send.message.fail.rate.elapsed")
- .setDescription("send message fail rate")
+ .doubleValueObserverBuilder("eventmesh.send.message.fail.rate")
+ .setDescription("send message fail rate.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getSendMsgFailRate(), Labels.empty()))
.build();
//replyMsg
meter
- .doubleValueObserverBuilder("eventmesh.reply.message.elapsed.sum")
- .setDescription("sum of reply message number")
+ .doubleValueObserverBuilder("eventmesh.reply.message.num")
+ .setDescription("sum of reply message number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getReplyMsgNumSum(), Labels.empty()))
.build();
//replyFail
meter
- .doubleValueObserverBuilder("eventmesh.reply.message.fail.elapsed.sum")
- .setDescription("sum of reply message fail number")
+ .doubleValueObserverBuilder("eventmesh.reply.message.fail.num")
+ .setDescription("sum of reply message fail number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getReplyMsgFailNumSum(), Labels.empty()))
.build();
//maxPushMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.max")
- .setDescription("max of push message tps")
+ .doubleValueObserverBuilder("eventmesh.push.message.tps.max")
+ .setDescription("max of push message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxPushMsgTPS(), Labels.empty()))
.build();
//avgPushMsgTPS
meter
- .doubleValueObserverBuilder("eventmesh.push.message.tps.elapsed.avg")
- .setDescription("avg of push message tps")
+ .doubleValueObserverBuilder("eventmesh.push.message.tps.avg")
+ .setDescription("avg of push message tps.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgPushMsgTPS(), Labels.empty()))
.build();
//sum
meter
- .doubleValueObserverBuilder("eventmesh.http.push.message.elapsed.sum")
- .setDescription("sum of http push message number")
+ .doubleValueObserverBuilder("eventmesh.http.push.message.num")
+ .setDescription("sum of http push message number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgNumSum(), Labels.empty()))
.build();
//sumFail
meter
- .doubleValueObserverBuilder("eventmesh.http.push.message.fail.elapsed.sum")
- .setDescription("sum of http push message fail number")
+ .doubleValueObserverBuilder("eventmesh.http.push.message.fail.num")
+ .setDescription("sum of http push message fail number.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getHttpPushFailNumSum(), Labels.empty()))
.build();
//sumFailRate
meter
- .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate.elapsed")
- .setDescription("http push message fail rate")
+ .doubleValueObserverBuilder("eventmesh.http.push.message.fail.rate")
+ .setDescription("http push message fail rate.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.getHttpPushMsgFailRate(), Labels.empty()))
.build();
//maxClientLatency
meter
- .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.max")
- .setDescription("max of http push latency")
+ .doubleValueObserverBuilder("eventmesh.http.push.latency.max")
+ .setDescription("max of http push latency.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.maxHTTPPushLatency(), Labels.empty()))
.build();
//avgClientLatency
meter
- .doubleValueObserverBuilder("eventmesh.http.push.latency.elapsed.avg")
- .setDescription("avg of http push latency")
+ .doubleValueObserverBuilder("eventmesh.http.push.latency.avg")
+ .setDescription("avg of http push latency.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgHTTPPushLatency(), Labels.empty()))
.build();
//batchMsgQ
meter
- .longValueObserverBuilder("eventmesh.batch.message.queue.elapsed.size")
- .setDescription("size of batch message queue")
+ .longValueObserverBuilder("eventmesh.batch.message.queue.size")
+ .setDescription("size of batch message queue.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(httpMetricsServer.getBatchMsgQ(), Labels.empty()))
.build();
//sendMsgQ
meter
- .longValueObserverBuilder("eventmesh.send.message.queue.elapsed.size")
- .setDescription("size of send message queue")
+ .longValueObserverBuilder("eventmesh.send.message.queue.size")
+ .setDescription("size of send message queue.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(httpMetricsServer.getSendMsgQ(), Labels.empty()))
.build();
//pushMsgQ
meter
- .longValueObserverBuilder("eventmesh.push.message.queue.elapsed.size")
- .setDescription("size of push message queue")
+ .longValueObserverBuilder("eventmesh.push.message.queue.size")
+ .setDescription("size of push message queue.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(httpMetricsServer.getPushMsgQ(), Labels.empty()))
.build();
//httpRetryQ
meter
- .longValueObserverBuilder("eventmesh.http.retry.queue.elapsed.size")
- .setDescription("size of http retry queue")
+ .longValueObserverBuilder("eventmesh.http.retry.queue.size")
+ .setDescription("size of http retry queue.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(httpMetricsServer.getHttpRetryQ(), Labels.empty()))
.build();
//batchAvgSend2MQCost
meter
- .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.elapsed.avg")
- .setDescription("avg of batch send message cost")
+ .doubleValueObserverBuilder("eventmesh.batch.send.message.cost.avg")
+ .setDescription("avg of batch send message cost.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgBatchSendMsgCost(), Labels.empty()))
.build();
//avgSend2MQCost
meter
- .doubleValueObserverBuilder("eventmesh.send.message.cost.elapsed.avg")
- .setDescription("avg of send message cost")
+ .doubleValueObserverBuilder("eventmesh.send.message.cost.avg")
+ .setDescription("avg of send message cost.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgSendMsgCost(), Labels.empty()))
.build();
//avgReply2MQCost
meter
- .doubleValueObserverBuilder("eventmesh.reply.message.cost.elapsed.avg")
- .setDescription("avg of reply message cost")
+ .doubleValueObserverBuilder("eventmesh.reply.message.cost.avg")
+ .setDescription("avg of reply message cost.")
.setUnit("HTTP")
.setUpdater(result -> result.observe(summaryMetrics.avgReplyMsgCost(), Labels.empty()))
.build();
}
public void shutdown(){
- configuration.shutdownPrometheusEndpoint();
+ OpenTelemetryPrometheusExporter.shutdown();
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java
new file mode 100644
index 0000000..730adc9
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryPrometheusExporter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
+
+import io.opentelemetry.exporter.prometheus.PrometheusCollector;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.prometheus.client.exporter.HTTPServer;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+public class OpenTelemetryPrometheusExporter {
+
+ private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryPrometheusExporter.class);
+
+ private static HTTPServer server;
+
+ /**
+ * Initializes the Meter SDK and configures the prometheus collector with all default settings.
+ *
+ * @param configuration configuration.
+ */
+ public synchronized static void initialize(CommonConfiguration configuration) {
+ if (server != null){
+ return;
+ }
+ PrometheusCollector.builder().setMetricProducer(
+ SdkMeterProvider.builder().buildAndRegisterGlobal()).buildAndRegister();
+ int port = configuration.eventMeshPrometheusPort;
+ try {
+ //Use the daemon thread to start an HTTP server to serve the default Prometheus registry.
+ server = new HTTPServer(port,true);
+ } catch (IOException e) {
+ logger.error("failed to start prometheus server, port: {} due to {}", port, e.getMessage());
+ }
+ }
+
+ public static void shutdown() {
+ if (server != null) {
+ server.stop();
+ server = null;
+ }
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
similarity index 71%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
index b49cef9..c758962 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/openTelemetry/OpenTelemetryTCPMetricsExporter.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/opentelemetry/OpenTelemetryTCPMetricsExporter.java
@@ -15,71 +15,62 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.metrics.openTelemetry;
+package org.apache.eventmesh.runtime.metrics.opentelemetry;
+import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.common.Labels;
-import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
public class OpenTelemetryTCPMetricsExporter {
- OpenTelemetryExporterConfiguration configuration = new OpenTelemetryExporterConfiguration();
- private Meter meter;
-
- private EventMeshTcpMonitor eventMeshTcpMonitor;
+ private final EventMeshTcpMonitor eventMeshTcpMonitor;
public OpenTelemetryTCPMetricsExporter(EventMeshTcpMonitor eventMeshTcpMonitor , EventMeshTCPConfiguration eventMeshTCPConfiguration){
+ OpenTelemetryPrometheusExporter.initialize(eventMeshTCPConfiguration);
this.eventMeshTcpMonitor = eventMeshTcpMonitor;
-
- // it is important to initialize the OpenTelemetry SDK as early as possible in your process.
- MeterProvider meterProvider = configuration.initializeOpenTelemetry(eventMeshTCPConfiguration);
- meter = meterProvider.get("OpenTelemetryTCPExporter", "0.13.1");
}
public void start(){
- if (meter==null){
- return;
- }
+ Meter meter = GlobalMeterProvider.getMeter("apache-eventmesh");
//retryQueueSize
meter
.doubleValueObserverBuilder("eventmesh.tcp.retry.queue.size")
- .setDescription("get size of retry queue")
+ .setDescription("get size of retry queue.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMeshTCPServer().getEventMeshTcpRetryer().getRetrySize(), Labels.empty()))
.build();
//client2eventMeshTPS
meter
- .doubleValueObserverBuilder("eventmesh.tcp.client2.tps")
- .setDescription("get tps of client to eventMesh")
+ .doubleValueObserverBuilder("eventmesh.tcp.server.tps")
+ .setDescription("get tps of client to eventMesh.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getClient2eventMeshTPS(), Labels.empty()))
.build();
//eventMesh2mqTPS
meter
- .doubleValueObserverBuilder("eventmesh.tcp.2mq.tps")
- .setDescription("get tps of eventMesh to mq")
+ .doubleValueObserverBuilder("eventmesh.tcp.mq.provider.tps")
+ .setDescription("get tps of eventMesh to mq.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2mqTPS(), Labels.empty()))
.build();
//mq2eventMeshTPS
meter
- .doubleValueObserverBuilder("eventmesh.tcp.mq2.tps")
- .setDescription("get tps of mq to eventMesh")
+ .doubleValueObserverBuilder("eventmesh.tcp.mq.consumer.tps")
+ .setDescription("get tps of mq to eventMesh.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getMq2eventMeshTPS(), Labels.empty()))
.build();
//eventMesh2clientTPS
meter
- .doubleValueObserverBuilder("eventmesh.tcp.2client.tps")
- .setDescription("get tps of eventMesh to client")
+ .doubleValueObserverBuilder("eventmesh.tcp.client.tps")
+ .setDescription("get tps of eventMesh to client.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getEventMesh2clientTPS(), Labels.empty()))
.build();
@@ -87,15 +78,15 @@ public class OpenTelemetryTCPMetricsExporter {
//allTPS
meter
.doubleValueObserverBuilder("eventmesh.tcp.all.tps")
- .setDescription("get all TPS")
+ .setDescription("get all TPS.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getAllTPS(), Labels.empty()))
.build();
//EventMeshTcpConnectionHandler.connections
meter
- .doubleValueObserverBuilder("eventmesh.tcp.connection.handler.connections")
- .setDescription("EventMeshTcpConnectionHandler.connections")
+ .doubleValueObserverBuilder("eventmesh.tcp.connection.num")
+ .setDescription("EventMeshTcpConnectionHandler.connections.")
.setUnit("TCP")
.setUpdater(result -> result.observe(EventMeshTcpConnectionHandler.connections.doubleValue(), Labels.empty()))
.build();
@@ -103,12 +94,13 @@ public class OpenTelemetryTCPMetricsExporter {
//subTopicNum
meter
.doubleValueObserverBuilder("eventmesh.tcp.sub.topic.num")
- .setDescription("get sub topic num")
+ .setDescription("get sub topic num.")
.setUnit("TCP")
.setUpdater(result -> result.observe(eventMeshTcpMonitor.getSubTopicNum(), Labels.empty()))
.build();
}
+
public void shutdown(){
- configuration.shutdownPrometheusEndpoint();
+ OpenTelemetryPrometheusExporter.shutdown();
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 99b0f05..2a2cbdc 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -32,7 +32,7 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpConnectionHandler;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants;
-import org.apache.eventmesh.runtime.metrics.openTelemetry.OpenTelemetryTCPMetricsExporter;
+import org.apache.eventmesh.runtime.metrics.opentelemetry.OpenTelemetryTCPMetricsExporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org